nu_protocol/pipeline/
pipeline_data.rs

1use crate::{
2    ast::{Call, PathMember},
3    engine::{EngineState, Stack},
4    location,
5    shell_error::{io::IoError, location::Location},
6    ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
7    Signals, Span, Type, Value,
8};
9use std::io::Write;
10
11const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
12
13/// The foundational abstraction for input and output to commands
14///
15/// This represents either a single Value or a stream of values coming into the command or leaving a command.
16///
17/// A note on implementation:
18///
19/// We've tried a few variations of this structure. Listing these below so we have a record.
20///
21/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
22///   Namely, how do you know the difference between a single string and a list of one string. How do you know
23///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
24///   list?
25///
26/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
27///   on values", but lead to a few unfortunate issues.
28///
29/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
30/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
31/// variable would see different values based on what you did to the first.
32///
33/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
34/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
35/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
36/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
37/// concrete list values rather than streams, and be able to view them without non-local effects.
38///
39/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
40///   them into any sources. Streams are still available to model the infinite streams approach of original
41///   Nushell.
42#[derive(Debug)]
43pub enum PipelineData {
44    Empty,
45    Value(Value, Option<PipelineMetadata>),
46    ListStream(ListStream, Option<PipelineMetadata>),
47    ByteStream(ByteStream, Option<PipelineMetadata>),
48}
49
50impl PipelineData {
51    pub fn empty() -> PipelineData {
52        PipelineData::Empty
53    }
54
55    pub fn metadata(&self) -> Option<PipelineMetadata> {
56        match self {
57            PipelineData::Empty => None,
58            PipelineData::Value(_, meta)
59            | PipelineData::ListStream(_, meta)
60            | PipelineData::ByteStream(_, meta) => meta.clone(),
61        }
62    }
63
64    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
65        match &mut self {
66            PipelineData::Empty => {}
67            PipelineData::Value(_, meta)
68            | PipelineData::ListStream(_, meta)
69            | PipelineData::ByteStream(_, meta) => *meta = metadata,
70        }
71        self
72    }
73
74    pub fn is_nothing(&self) -> bool {
75        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
76            || matches!(self, PipelineData::Empty)
77    }
78
79    /// PipelineData doesn't always have a Span, but we can try!
80    pub fn span(&self) -> Option<Span> {
81        match self {
82            PipelineData::Empty => None,
83            PipelineData::Value(value, ..) => Some(value.span()),
84            PipelineData::ListStream(stream, ..) => Some(stream.span()),
85            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
86        }
87    }
88
89    /// Change the span of the [`PipelineData`].
90    ///
91    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::Empty`].
92    pub fn with_span(self, span: Span) -> Self {
93        match self {
94            PipelineData::Empty => PipelineData::Value(Value::nothing(span), None),
95            PipelineData::Value(value, metadata) => {
96                PipelineData::Value(value.with_span(span), metadata)
97            }
98            PipelineData::ListStream(stream, metadata) => {
99                PipelineData::ListStream(stream.with_span(span), metadata)
100            }
101            PipelineData::ByteStream(stream, metadata) => {
102                PipelineData::ByteStream(stream.with_span(span), metadata)
103            }
104        }
105    }
106
107    /// Get a type that is representative of the `PipelineData`.
108    ///
109    /// The type returned here makes no effort to collect a stream, so it may be a different type
110    /// than would be returned by [`Value::get_type()`] on the result of
111    /// [`.into_value()`](Self::into_value).
112    ///
113    /// Specifically, a `ListStream` results in `list<any>` rather than
114    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
115    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
116    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
117    pub fn get_type(&self) -> Type {
118        match self {
119            PipelineData::Empty => Type::Nothing,
120            PipelineData::Value(value, _) => value.get_type(),
121            PipelineData::ListStream(_, _) => Type::list(Type::Any),
122            PipelineData::ByteStream(stream, _) => stream.type_().into(),
123        }
124    }
125
126    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
127    ///
128    /// This check makes no effort to collect a stream, so it may be a different result
129    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
130    /// [`.into_value()`](Self::into_value).
131    ///
132    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
133    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
134    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
135    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
136    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
137    ///
138    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
139    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
140    pub fn is_subtype_of(&self, other: &Type) -> bool {
141        match (self, other) {
142            (_, Type::Any) => true,
143            (PipelineData::Empty, Type::Nothing) => true,
144            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
145
146            // a list stream could be a list with any type, including a table
147            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
148
149            (PipelineData::ByteStream(stream, ..), Type::String)
150                if stream.type_().is_string_coercible() =>
151            {
152                true
153            }
154            (PipelineData::ByteStream(stream, ..), Type::Binary)
155                if stream.type_().is_binary_coercible() =>
156            {
157                true
158            }
159
160            (PipelineData::Empty, _) => false,
161            (PipelineData::ListStream(..), _) => false,
162            (PipelineData::ByteStream(..), _) => false,
163        }
164    }
165
166    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
167        match self {
168            PipelineData::Empty => Ok(Value::nothing(span)),
169            PipelineData::Value(value, ..) => {
170                if value.span() == Span::unknown() {
171                    Ok(value.with_span(span))
172                } else {
173                    Ok(value)
174                }
175            }
176            PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
177            PipelineData::ByteStream(stream, ..) => stream.into_value(),
178        }
179    }
180
181    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
182    ///
183    /// This means that lists and ranges are converted into list streams, and strings and binary are
184    /// converted into byte streams.
185    ///
186    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
187    /// variant. If the variant is already a stream variant, it is returned as-is.
188    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
189        let span = self.span().unwrap_or(Span::unknown());
190        match self {
191            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
192            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
193                let metadata = metadata.clone();
194                Ok(PipelineData::ListStream(
195                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
196                    metadata,
197                ))
198            }
199            PipelineData::Value(Value::String { val, .. }, metadata) => {
200                Ok(PipelineData::ByteStream(
201                    ByteStream::read_string(val, span, engine_state.signals().clone()),
202                    metadata,
203                ))
204            }
205            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
206                Ok(PipelineData::ByteStream(
207                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
208                    metadata,
209                ))
210            }
211            _ => Err(self),
212        }
213    }
214
215    /// Drain and write this [`PipelineData`] to `dest`.
216    ///
217    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
218    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
219        match self {
220            PipelineData::Empty => Ok(()),
221            PipelineData::Value(value, ..) => {
222                let bytes = value_to_bytes(value)?;
223                dest.write_all(&bytes).map_err(|err| {
224                    IoError::new_internal(
225                        err.kind(),
226                        "Could not write PipelineData to dest",
227                        crate::location!(),
228                    )
229                })?;
230                dest.flush().map_err(|err| {
231                    IoError::new_internal(
232                        err.kind(),
233                        "Could not flush PipelineData to dest",
234                        crate::location!(),
235                    )
236                })?;
237                Ok(())
238            }
239            PipelineData::ListStream(stream, ..) => {
240                for value in stream {
241                    let bytes = value_to_bytes(value)?;
242                    dest.write_all(&bytes).map_err(|err| {
243                        IoError::new_internal(
244                            err.kind(),
245                            "Could not write PipelineData to dest",
246                            crate::location!(),
247                        )
248                    })?;
249                    dest.write_all(b"\n").map_err(|err| {
250                        IoError::new_internal(
251                            err.kind(),
252                            "Could not write linebreak after PipelineData to dest",
253                            crate::location!(),
254                        )
255                    })?;
256                }
257                dest.flush().map_err(|err| {
258                    IoError::new_internal(
259                        err.kind(),
260                        "Could not flush PipelineData to dest",
261                        crate::location!(),
262                    )
263                })?;
264                Ok(())
265            }
266            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
267        }
268    }
269
270    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
271    ///
272    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
273    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
274    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
275    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
276    pub fn drain_to_out_dests(
277        self,
278        engine_state: &EngineState,
279        stack: &mut Stack,
280    ) -> Result<Self, ShellError> {
281        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
282            OutDest::Print => {
283                self.print_table(engine_state, stack, false, false)?;
284                Ok(Self::Empty)
285            }
286            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
287            OutDest::Value => {
288                let metadata = self.metadata();
289                let span = self.span().unwrap_or(Span::unknown());
290                self.into_value(span).map(|val| Self::Value(val, metadata))
291            }
292            OutDest::File(file) => {
293                self.write_to(file.as_ref())?;
294                Ok(Self::Empty)
295            }
296            OutDest::Null | OutDest::Inherit => {
297                self.drain()?;
298                Ok(Self::Empty)
299            }
300        }
301    }
302
303    pub fn drain(self) -> Result<(), ShellError> {
304        match self {
305            Self::Empty => Ok(()),
306            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
307            Self::Value(..) => Ok(()),
308            Self::ListStream(stream, ..) => stream.drain(),
309            Self::ByteStream(stream, ..) => stream.drain(),
310        }
311    }
312
313    /// Try convert from self into iterator
314    ///
315    /// It returns Err if the `self` cannot be converted to an iterator.
316    ///
317    /// The `span` should be the span of the command or operation that would raise an error.
318    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
319        Ok(PipelineIterator(match self {
320            PipelineData::Value(value, ..) => {
321                let val_span = value.span();
322                match value {
323                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
324                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
325                    ),
326                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
327                        ListStream::new(
328                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
329                            val_span,
330                            Signals::empty(),
331                        )
332                        .into_iter(),
333                    ),
334                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
335                        ListStream::new(
336                            val.into_range_iter(val_span, Signals::empty()),
337                            val_span,
338                            Signals::empty(),
339                        )
340                        .into_iter(),
341                    ),
342                    // Propagate errors by explicitly matching them before the final case.
343                    Value::Error { error, .. } => return Err(*error),
344                    other => {
345                        return Err(ShellError::OnlySupportsThisInputType {
346                            exp_input_type: "list, binary, range, or byte stream".into(),
347                            wrong_type: other.get_type().to_string(),
348                            dst_span: span,
349                            src_span: val_span,
350                        })
351                    }
352                }
353            }
354            PipelineData::ListStream(stream, ..) => {
355                PipelineIteratorInner::ListStream(stream.into_iter())
356            }
357            PipelineData::Empty => {
358                return Err(ShellError::OnlySupportsThisInputType {
359                    exp_input_type: "list, binary, range, or byte stream".into(),
360                    wrong_type: "null".into(),
361                    dst_span: span,
362                    src_span: span,
363                })
364            }
365            PipelineData::ByteStream(stream, ..) => {
366                if let Some(chunks) = stream.chunks() {
367                    PipelineIteratorInner::ByteStream(chunks)
368                } else {
369                    PipelineIteratorInner::Empty
370                }
371            }
372        }))
373    }
374
375    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
376        match self {
377            PipelineData::Empty => Ok(String::new()),
378            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
379            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
380            PipelineData::ByteStream(stream, ..) => stream.into_string(),
381        }
382    }
383
384    /// Retrieves string from pipeline data.
385    ///
386    /// As opposed to `collect_string` this raises error rather than converting non-string values.
387    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
388    pub fn collect_string_strict(
389        self,
390        span: Span,
391    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
392        match self {
393            PipelineData::Empty => Ok((String::new(), span, None)),
394            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
395            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
396                err_message: "string".into(),
397                span: val.span(),
398            }),
399            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
400                err_message: "string".into(),
401                span,
402            }),
403            PipelineData::ByteStream(stream, metadata) => {
404                let span = stream.span();
405                Ok((stream.into_string()?, span, metadata))
406            }
407        }
408    }
409
410    pub fn follow_cell_path(
411        self,
412        cell_path: &[PathMember],
413        head: Span,
414        insensitive: bool,
415    ) -> Result<Value, ShellError> {
416        match self {
417            // FIXME: there are probably better ways of doing this
418            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
419                .follow_cell_path(cell_path, insensitive),
420            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path, insensitive),
421            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
422                type_name: "empty pipeline".to_string(),
423                span: head,
424            }),
425            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
426                type_name: stream.type_().describe().to_owned(),
427                span: stream.span(),
428            }),
429        }
430    }
431
432    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
433    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
434    where
435        Self: Sized,
436        F: FnMut(Value) -> Value + 'static + Send,
437    {
438        match self {
439            PipelineData::Value(value, metadata) => {
440                let span = value.span();
441                let pipeline = match value {
442                    Value::List { vals, .. } => vals
443                        .into_iter()
444                        .map(f)
445                        .into_pipeline_data(span, signals.clone()),
446                    Value::Range { val, .. } => val
447                        .into_range_iter(span, Signals::empty())
448                        .map(f)
449                        .into_pipeline_data(span, signals.clone()),
450                    value => match f(value) {
451                        Value::Error { error, .. } => return Err(*error),
452                        v => v.into_pipeline_data(),
453                    },
454                };
455                Ok(pipeline.set_metadata(metadata))
456            }
457            PipelineData::Empty => Ok(PipelineData::Empty),
458            PipelineData::ListStream(stream, metadata) => {
459                Ok(PipelineData::ListStream(stream.map(f), metadata))
460            }
461            PipelineData::ByteStream(stream, metadata) => {
462                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
463            }
464        }
465    }
466
467    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
468    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
469    where
470        Self: Sized,
471        U: IntoIterator<Item = Value> + 'static,
472        <U as IntoIterator>::IntoIter: 'static + Send,
473        F: FnMut(Value) -> U + 'static + Send,
474    {
475        match self {
476            PipelineData::Empty => Ok(PipelineData::Empty),
477            PipelineData::Value(value, metadata) => {
478                let span = value.span();
479                let pipeline = match value {
480                    Value::List { vals, .. } => vals
481                        .into_iter()
482                        .flat_map(f)
483                        .into_pipeline_data(span, signals.clone()),
484                    Value::Range { val, .. } => val
485                        .into_range_iter(span, Signals::empty())
486                        .flat_map(f)
487                        .into_pipeline_data(span, signals.clone()),
488                    value => f(value)
489                        .into_iter()
490                        .into_pipeline_data(span, signals.clone()),
491                };
492                Ok(pipeline.set_metadata(metadata))
493            }
494            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
495                stream.modify(|iter| iter.flat_map(f)),
496                metadata,
497            )),
498            PipelineData::ByteStream(stream, metadata) => {
499                // TODO: is this behavior desired / correct ?
500                let span = stream.span();
501                let iter = match String::from_utf8(stream.into_bytes()?) {
502                    Ok(mut str) => {
503                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
504                        f(Value::string(str, span))
505                    }
506                    Err(err) => f(Value::binary(err.into_bytes(), span)),
507                };
508                Ok(iter.into_iter().into_pipeline_data_with_metadata(
509                    span,
510                    signals.clone(),
511                    metadata,
512                ))
513            }
514        }
515    }
516
517    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
518    where
519        Self: Sized,
520        F: FnMut(&Value) -> bool + 'static + Send,
521    {
522        match self {
523            PipelineData::Empty => Ok(PipelineData::Empty),
524            PipelineData::Value(value, metadata) => {
525                let span = value.span();
526                let pipeline = match value {
527                    Value::List { vals, .. } => vals
528                        .into_iter()
529                        .filter(f)
530                        .into_pipeline_data(span, signals.clone()),
531                    Value::Range { val, .. } => val
532                        .into_range_iter(span, Signals::empty())
533                        .filter(f)
534                        .into_pipeline_data(span, signals.clone()),
535                    value => {
536                        if f(&value) {
537                            value.into_pipeline_data()
538                        } else {
539                            Value::nothing(span).into_pipeline_data()
540                        }
541                    }
542                };
543                Ok(pipeline.set_metadata(metadata))
544            }
545            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
546                stream.modify(|iter| iter.filter(f)),
547                metadata,
548            )),
549            PipelineData::ByteStream(stream, metadata) => {
550                // TODO: is this behavior desired / correct ?
551                let span = stream.span();
552                let value = match String::from_utf8(stream.into_bytes()?) {
553                    Ok(mut str) => {
554                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
555                        Value::string(str, span)
556                    }
557                    Err(err) => Value::binary(err.into_bytes(), span),
558                };
559                let value = if f(&value) {
560                    value
561                } else {
562                    Value::nothing(span)
563                };
564                Ok(value.into_pipeline_data_with_metadata(metadata))
565            }
566        }
567    }
568
569    /// Try to convert Value from Value::Range to Value::List.
570    /// This is useful to expand Value::Range into array notation, specifically when
571    /// converting `to json` or `to nuon`.
572    /// `1..3 | to XX -> [1,2,3]`
573    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
574        match self {
575            PipelineData::Value(v, metadata) => {
576                let span = v.span();
577                match v {
578                    Value::Range { val, .. } => {
579                        match *val {
580                            Range::IntRange(range) => {
581                                if range.is_unbounded() {
582                                    return Err(ShellError::GenericError {
583                                        error: "Cannot create range".into(),
584                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
585                                        span: Some(span),
586                                        help: Some("Consider using ranges with valid start and end point.".into()),
587                                        inner: vec![],
588                                    });
589                                }
590                            }
591                            Range::FloatRange(range) => {
592                                if range.is_unbounded() {
593                                    return Err(ShellError::GenericError {
594                                        error: "Cannot create range".into(),
595                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
596                                        span: Some(span),
597                                        help: Some("Consider using ranges with valid start and end point.".into()),
598                                        inner: vec![],
599                                    });
600                                }
601                            }
602                        }
603                        let range_values: Vec<Value> =
604                            val.into_range_iter(span, Signals::empty()).collect();
605                        Ok(PipelineData::Value(Value::list(range_values, span), None))
606                    }
607                    x => Ok(PipelineData::Value(x, metadata)),
608                }
609            }
610            _ => Ok(self),
611        }
612    }
613
614    /// Consume and print self data immediately, formatted using table command.
615    ///
616    /// This does not respect the display_output hook. If a value is being printed out by a command,
617    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
618    ///
619    /// `no_newline` controls if we need to attach newline character to output.
620    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
621    pub fn print_table(
622        self,
623        engine_state: &EngineState,
624        stack: &mut Stack,
625        no_newline: bool,
626        to_stderr: bool,
627    ) -> Result<(), ShellError> {
628        match self {
629            // Print byte streams directly as long as they aren't binary.
630            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
631                stream.print(to_stderr)
632            }
633            _ => {
634                // If the table function is in the declarations, then we can use it
635                // to create the table value that will be printed in the terminal
636                if let Some(decl_id) = engine_state.table_decl_id {
637                    let command = engine_state.get_decl(decl_id);
638                    if command.block_id().is_some() {
639                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
640                    } else {
641                        let call = Call::new(Span::new(0, 0));
642                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
643                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
644                    }
645                } else {
646                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
647                }
648            }
649        }
650    }
651
652    /// Consume and print self data without any extra formatting.
653    ///
654    /// This does not use the `table` command to format data, and also prints binary values and
655    /// streams in their raw format without generating a hexdump first.
656    ///
657    /// `no_newline` controls if we need to attach newline character to output.
658    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
659    pub fn print_raw(
660        self,
661        engine_state: &EngineState,
662        no_newline: bool,
663        to_stderr: bool,
664    ) -> Result<(), ShellError> {
665        let span = self.span();
666        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
667            if to_stderr {
668                write_all_and_flush(
669                    bytes,
670                    &mut std::io::stderr().lock(),
671                    "stderr",
672                    span,
673                    engine_state.signals(),
674                )?;
675            } else {
676                write_all_and_flush(
677                    bytes,
678                    &mut std::io::stdout().lock(),
679                    "stdout",
680                    span,
681                    engine_state.signals(),
682                )?;
683            }
684            Ok(())
685        } else {
686            self.write_all_and_flush(engine_state, no_newline, to_stderr)
687        }
688    }
689
690    fn write_all_and_flush(
691        self,
692        engine_state: &EngineState,
693        no_newline: bool,
694        to_stderr: bool,
695    ) -> Result<(), ShellError> {
696        let span = self.span();
697        if let PipelineData::ByteStream(stream, ..) = self {
698            // Copy ByteStreams directly
699            stream.print(to_stderr)
700        } else {
701            let config = engine_state.get_config();
702            for item in self {
703                let mut out = if let Value::Error { error, .. } = item {
704                    return Err(*error);
705                } else {
706                    item.to_expanded_string("\n", config)
707                };
708
709                if !no_newline {
710                    out.push('\n');
711                }
712
713                if to_stderr {
714                    write_all_and_flush(
715                        out,
716                        &mut std::io::stderr().lock(),
717                        "stderr",
718                        span,
719                        engine_state.signals(),
720                    )?;
721                } else {
722                    write_all_and_flush(
723                        out,
724                        &mut std::io::stdout().lock(),
725                        "stdout",
726                        span,
727                        engine_state.signals(),
728                    )?;
729                }
730            }
731
732            Ok(())
733        }
734    }
735
736    pub fn unsupported_input_error(
737        self,
738        expected_type: impl Into<String>,
739        span: Span,
740    ) -> ShellError {
741        match self {
742            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
743            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
744                exp_input_type: expected_type.into(),
745                wrong_type: value.get_type().get_non_specified_string(),
746                dst_span: span,
747                src_span: value.span(),
748            },
749            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
750                exp_input_type: expected_type.into(),
751                wrong_type: "list (stream)".into(),
752                dst_span: span,
753                src_span: stream.span(),
754            },
755            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
756                exp_input_type: expected_type.into(),
757                wrong_type: stream.type_().describe().into(),
758                dst_span: span,
759                src_span: stream.span(),
760            },
761        }
762    }
763}
764
765pub fn write_all_and_flush<T>(
766    data: T,
767    destination: &mut impl Write,
768    destination_name: &str,
769    span: Option<Span>,
770    signals: &Signals,
771) -> Result<(), ShellError>
772where
773    T: AsRef<[u8]>,
774{
775    let io_error_map = |err: std::io::Error, location: Location| {
776        let context = format!("Writing to {} failed", destination_name);
777        match span {
778            None => IoError::new_internal(err.kind(), context, location),
779            Some(span) if span == Span::unknown() => {
780                IoError::new_internal(err.kind(), context, location)
781            }
782            Some(span) => IoError::new_with_additional_context(err.kind(), span, None, context),
783        }
784    };
785
786    let span = span.unwrap_or(Span::unknown());
787    const OUTPUT_CHUNK_SIZE: usize = 8192;
788    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
789        signals.check(span)?;
790        destination
791            .write_all(chunk)
792            .map_err(|err| io_error_map(err, location!()))?;
793    }
794    destination
795        .flush()
796        .map_err(|err| io_error_map(err, location!()))?;
797    Ok(())
798}
799
800enum PipelineIteratorInner {
801    Empty,
802    Value(Value),
803    ListStream(crate::list_stream::IntoIter),
804    ByteStream(crate::byte_stream::Chunks),
805}
806
807pub struct PipelineIterator(PipelineIteratorInner);
808
809impl IntoIterator for PipelineData {
810    type Item = Value;
811
812    type IntoIter = PipelineIterator;
813
814    fn into_iter(self) -> Self::IntoIter {
815        PipelineIterator(match self {
816            PipelineData::Empty => PipelineIteratorInner::Empty,
817            PipelineData::Value(value, ..) => {
818                let span = value.span();
819                match value {
820                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
821                        ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
822                    ),
823                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
824                        ListStream::new(
825                            val.into_range_iter(span, Signals::empty()),
826                            span,
827                            Signals::empty(),
828                        )
829                        .into_iter(),
830                    ),
831                    x => PipelineIteratorInner::Value(x),
832                }
833            }
834            PipelineData::ListStream(stream, ..) => {
835                PipelineIteratorInner::ListStream(stream.into_iter())
836            }
837            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
838                PipelineIteratorInner::Empty,
839                PipelineIteratorInner::ByteStream,
840            ),
841        })
842    }
843}
844
845impl Iterator for PipelineIterator {
846    type Item = Value;
847
848    fn next(&mut self) -> Option<Self::Item> {
849        match &mut self.0 {
850            PipelineIteratorInner::Empty => None,
851            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
852            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
853            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
854            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
855                Ok(x) => x,
856                Err(err) => Value::error(
857                    err,
858                    Span::unknown(), //FIXME: unclear where this span should come from
859                ),
860            }),
861        }
862    }
863}
864
865pub trait IntoPipelineData {
866    fn into_pipeline_data(self) -> PipelineData;
867
868    fn into_pipeline_data_with_metadata(
869        self,
870        metadata: impl Into<Option<PipelineMetadata>>,
871    ) -> PipelineData;
872}
873
874impl<V> IntoPipelineData for V
875where
876    V: Into<Value>,
877{
878    fn into_pipeline_data(self) -> PipelineData {
879        PipelineData::Value(self.into(), None)
880    }
881
882    fn into_pipeline_data_with_metadata(
883        self,
884        metadata: impl Into<Option<PipelineMetadata>>,
885    ) -> PipelineData {
886        PipelineData::Value(self.into(), metadata.into())
887    }
888}
889
890pub trait IntoInterruptiblePipelineData {
891    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
892    fn into_pipeline_data_with_metadata(
893        self,
894        span: Span,
895        signals: Signals,
896        metadata: impl Into<Option<PipelineMetadata>>,
897    ) -> PipelineData;
898}
899
900impl<I> IntoInterruptiblePipelineData for I
901where
902    I: IntoIterator + Send + 'static,
903    I::IntoIter: Send + 'static,
904    <I::IntoIter as Iterator>::Item: Into<Value>,
905{
906    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
907        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
908    }
909
910    fn into_pipeline_data_with_metadata(
911        self,
912        span: Span,
913        signals: Signals,
914        metadata: impl Into<Option<PipelineMetadata>>,
915    ) -> PipelineData {
916        PipelineData::ListStream(
917            ListStream::new(self.into_iter().map(Into::into), span, signals),
918            metadata.into(),
919        )
920    }
921}
922
923fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
924    let bytes = match value {
925        Value::String { val, .. } => val.into_bytes(),
926        Value::Binary { val, .. } => val,
927        Value::List { vals, .. } => {
928            let val = vals
929                .into_iter()
930                .map(Value::coerce_into_string)
931                .collect::<Result<Vec<String>, ShellError>>()?
932                .join("\n")
933                + "\n";
934
935            val.into_bytes()
936        }
937        // Propagate errors by explicitly matching them before the final case.
938        Value::Error { error, .. } => return Err(*error),
939        value => value.coerce_into_string()?.into_bytes(),
940    };
941    Ok(bytes)
942}