nu_protocol/pipeline/
pipeline_data.rs

1#[cfg(feature = "os")]
2use crate::process::ExitStatusFuture;
3use crate::{
4    ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5    Range, ShellError, Signals, Span, Type, Value,
6    ast::{Call, PathMember},
7    engine::{EngineState, Stack},
8    location,
9    shell_error::{io::IoError, location::Location},
10};
11use std::{
12    borrow::Cow,
13    io::Write,
14    ops::Deref,
15    sync::{Arc, Mutex},
16};
17
18const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
19
20/// The foundational abstraction for input and output to commands
21///
22/// This represents either a single Value or a stream of values coming into the command or leaving a command.
23///
24/// A note on implementation:
25///
26/// We've tried a few variations of this structure. Listing these below so we have a record.
27///
28/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
29///   Namely, how do you know the difference between a single string and a list of one string. How do you know
30///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
31///   list?
32///
33/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
34///   on values", but lead to a few unfortunate issues.
35///
36/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
37/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
38/// variable would see different values based on what you did to the first.
39///
40/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
41/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
42/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
43/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
44/// concrete list values rather than streams, and be able to view them without non-local effects.
45///
46/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
47///   them into any sources. Streams are still available to model the infinite streams approach of original
48///   Nushell.
49#[derive(Debug)]
50pub enum PipelineData {
51    Empty,
52    Value(Value, Option<PipelineMetadata>),
53    ListStream(ListStream, Option<PipelineMetadata>),
54    ByteStream(ByteStream, Option<PipelineMetadata>),
55}
56
57impl PipelineData {
58    pub const fn empty() -> PipelineData {
59        PipelineData::Empty
60    }
61
62    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
63        PipelineData::Value(val, metadata.into())
64    }
65
66    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
67        PipelineData::ListStream(stream, metadata.into())
68    }
69
70    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
71        PipelineData::ByteStream(stream, metadata.into())
72    }
73
74    pub fn metadata(&self) -> Option<PipelineMetadata> {
75        match self {
76            PipelineData::Empty => None,
77            PipelineData::Value(_, meta)
78            | PipelineData::ListStream(_, meta)
79            | PipelineData::ByteStream(_, meta) => meta.clone(),
80        }
81    }
82
83    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
84        match &mut self {
85            PipelineData::Empty => {}
86            PipelineData::Value(_, meta)
87            | PipelineData::ListStream(_, meta)
88            | PipelineData::ByteStream(_, meta) => *meta = metadata,
89        }
90        self
91    }
92
93    pub fn is_nothing(&self) -> bool {
94        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
95            || matches!(self, PipelineData::Empty)
96    }
97
98    /// PipelineData doesn't always have a Span, but we can try!
99    pub fn span(&self) -> Option<Span> {
100        match self {
101            PipelineData::Empty => None,
102            PipelineData::Value(value, ..) => Some(value.span()),
103            PipelineData::ListStream(stream, ..) => Some(stream.span()),
104            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
105        }
106    }
107
108    /// Change the span of the [`PipelineData`].
109    ///
110    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
111    pub fn with_span(self, span: Span) -> Self {
112        match self {
113            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
114            PipelineData::Value(value, metadata) => {
115                PipelineData::value(value.with_span(span), metadata)
116            }
117            PipelineData::ListStream(stream, metadata) => {
118                PipelineData::list_stream(stream.with_span(span), metadata)
119            }
120            PipelineData::ByteStream(stream, metadata) => {
121                PipelineData::byte_stream(stream.with_span(span), metadata)
122            }
123        }
124    }
125
126    /// Get a type that is representative of the `PipelineData`.
127    ///
128    /// The type returned here makes no effort to collect a stream, so it may be a different type
129    /// than would be returned by [`Value::get_type()`] on the result of
130    /// [`.into_value()`](Self::into_value).
131    ///
132    /// Specifically, a `ListStream` results in `list<any>` rather than
133    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
134    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
135    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
136    pub fn get_type(&self) -> Type {
137        match self {
138            PipelineData::Empty => Type::Nothing,
139            PipelineData::Value(value, _) => value.get_type(),
140            PipelineData::ListStream(_, _) => Type::list(Type::Any),
141            PipelineData::ByteStream(stream, _) => stream.type_().into(),
142        }
143    }
144
145    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
146    ///
147    /// This check makes no effort to collect a stream, so it may be a different result
148    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
149    /// [`.into_value()`](Self::into_value).
150    ///
151    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
152    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
153    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
154    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
155    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
156    ///
157    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
158    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
159    pub fn is_subtype_of(&self, other: &Type) -> bool {
160        match (self, other) {
161            (_, Type::Any) => true,
162            (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
163            (PipelineData::Empty, Type::Nothing) => true,
164            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
165
166            // a list stream could be a list with any type, including a table
167            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
168
169            (PipelineData::ByteStream(stream, ..), Type::String)
170                if stream.type_().is_string_coercible() =>
171            {
172                true
173            }
174            (PipelineData::ByteStream(stream, ..), Type::Binary)
175                if stream.type_().is_binary_coercible() =>
176            {
177                true
178            }
179
180            (PipelineData::Empty, _) => false,
181            (PipelineData::ListStream(..), _) => false,
182            (PipelineData::ByteStream(..), _) => false,
183        }
184    }
185
186    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
187        match self {
188            PipelineData::Empty => Ok(Value::nothing(span)),
189            PipelineData::Value(value, ..) => {
190                if value.span() == Span::unknown() {
191                    Ok(value.with_span(span))
192                } else {
193                    Ok(value)
194                }
195            }
196            PipelineData::ListStream(stream, ..) => stream.into_value(),
197            PipelineData::ByteStream(stream, ..) => stream.into_value(),
198        }
199    }
200
201    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
202    ///
203    /// This means that lists and ranges are converted into list streams, and strings and binary are
204    /// converted into byte streams.
205    ///
206    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
207    /// variant. If the variant is already a stream variant, it is returned as-is.
208    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
209        let span = self.span().unwrap_or(Span::unknown());
210        match self {
211            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
212            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
213                let metadata = metadata.clone();
214                Ok(PipelineData::list_stream(
215                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
216                    metadata,
217                ))
218            }
219            PipelineData::Value(Value::String { val, .. }, metadata) => {
220                Ok(PipelineData::byte_stream(
221                    ByteStream::read_string(val, span, engine_state.signals().clone()),
222                    metadata,
223                ))
224            }
225            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
226                Ok(PipelineData::byte_stream(
227                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
228                    metadata,
229                ))
230            }
231            _ => Err(self),
232        }
233    }
234
235    /// Drain and write this [`PipelineData`] to `dest`.
236    ///
237    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
238    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
239        match self {
240            PipelineData::Empty => Ok(()),
241            PipelineData::Value(value, ..) => {
242                let bytes = value_to_bytes(value)?;
243                dest.write_all(&bytes).map_err(|err| {
244                    IoError::new_internal(
245                        err,
246                        "Could not write PipelineData to dest",
247                        crate::location!(),
248                    )
249                })?;
250                dest.flush().map_err(|err| {
251                    IoError::new_internal(
252                        err,
253                        "Could not flush PipelineData to dest",
254                        crate::location!(),
255                    )
256                })?;
257                Ok(())
258            }
259            PipelineData::ListStream(stream, ..) => {
260                for value in stream {
261                    let bytes = value_to_bytes(value)?;
262                    dest.write_all(&bytes).map_err(|err| {
263                        IoError::new_internal(
264                            err,
265                            "Could not write PipelineData to dest",
266                            crate::location!(),
267                        )
268                    })?;
269                    dest.write_all(b"\n").map_err(|err| {
270                        IoError::new_internal(
271                            err,
272                            "Could not write linebreak after PipelineData to dest",
273                            crate::location!(),
274                        )
275                    })?;
276                }
277                dest.flush().map_err(|err| {
278                    IoError::new_internal(
279                        err,
280                        "Could not flush PipelineData to dest",
281                        crate::location!(),
282                    )
283                })?;
284                Ok(())
285            }
286            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
287        }
288    }
289
290    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
291    ///
292    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
293    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
294    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
295    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
296    pub fn drain_to_out_dests(
297        self,
298        engine_state: &EngineState,
299        stack: &mut Stack,
300    ) -> Result<Self, ShellError> {
301        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
302            OutDest::Print => {
303                self.print_table(engine_state, stack, false, false)?;
304                Ok(Self::Empty)
305            }
306            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
307            OutDest::Value => {
308                let metadata = self.metadata();
309                let span = self.span().unwrap_or(Span::unknown());
310                self.into_value(span).map(|val| Self::Value(val, metadata))
311            }
312            OutDest::File(file) => {
313                self.write_to(file.as_ref())?;
314                Ok(Self::Empty)
315            }
316            OutDest::Null | OutDest::Inherit => {
317                self.drain()?;
318                Ok(Self::Empty)
319            }
320        }
321    }
322
323    pub fn drain(self) -> Result<(), ShellError> {
324        match self {
325            Self::Empty => Ok(()),
326            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
327            Self::Value(..) => Ok(()),
328            Self::ListStream(stream, ..) => stream.drain(),
329            Self::ByteStream(stream, ..) => stream.drain(),
330        }
331    }
332
333    /// Try convert from self into iterator
334    ///
335    /// It returns Err if the `self` cannot be converted to an iterator.
336    ///
337    /// The `span` should be the span of the command or operation that would raise an error.
338    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
339        Ok(PipelineIterator(match self {
340            PipelineData::Value(value, ..) => {
341                let val_span = value.span();
342                match value {
343                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
344                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
345                    ),
346                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
347                        ListStream::new(
348                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
349                            val_span,
350                            Signals::empty(),
351                        )
352                        .into_iter(),
353                    ),
354                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
355                        ListStream::new(
356                            val.into_range_iter(val_span, Signals::empty()),
357                            val_span,
358                            Signals::empty(),
359                        )
360                        .into_iter(),
361                    ),
362                    // Propagate errors by explicitly matching them before the final case.
363                    Value::Error { error, .. } => return Err(*error),
364                    other => {
365                        return Err(ShellError::OnlySupportsThisInputType {
366                            exp_input_type: "list, binary, range, or byte stream".into(),
367                            wrong_type: other.get_type().to_string(),
368                            dst_span: span,
369                            src_span: val_span,
370                        });
371                    }
372                }
373            }
374            PipelineData::ListStream(stream, ..) => {
375                PipelineIteratorInner::ListStream(stream.into_iter())
376            }
377            PipelineData::Empty => {
378                return Err(ShellError::OnlySupportsThisInputType {
379                    exp_input_type: "list, binary, range, or byte stream".into(),
380                    wrong_type: "null".into(),
381                    dst_span: span,
382                    src_span: span,
383                });
384            }
385            PipelineData::ByteStream(stream, ..) => {
386                if let Some(chunks) = stream.chunks() {
387                    PipelineIteratorInner::ByteStream(chunks)
388                } else {
389                    PipelineIteratorInner::Empty
390                }
391            }
392        }))
393    }
394
395    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
396        match self {
397            PipelineData::Empty => Ok(String::new()),
398            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
399            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
400            PipelineData::ByteStream(stream, ..) => stream.into_string(),
401        }
402    }
403
404    /// Retrieves string from pipeline data.
405    ///
406    /// As opposed to `collect_string` this raises error rather than converting non-string values.
407    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
408    pub fn collect_string_strict(
409        self,
410        span: Span,
411    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
412        match self {
413            PipelineData::Empty => Ok((String::new(), span, None)),
414            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
415            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
416                err_message: "string".into(),
417                span: val.span(),
418            }),
419            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
420                err_message: "string".into(),
421                span,
422            }),
423            PipelineData::ByteStream(stream, metadata) => {
424                let span = stream.span();
425                Ok((stream.into_string()?, span, metadata))
426            }
427        }
428    }
429
430    pub fn follow_cell_path(
431        self,
432        cell_path: &[PathMember],
433        head: Span,
434    ) -> Result<Value, ShellError> {
435        match self {
436            // FIXME: there are probably better ways of doing this
437            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
438                .follow_cell_path(cell_path)
439                .map(Cow::into_owned),
440            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
441            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
442                type_name: "empty pipeline".to_string(),
443                span: head,
444            }),
445            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
446                type_name: stream.type_().describe().to_owned(),
447                span: stream.span(),
448            }),
449        }
450    }
451
452    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
453    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
454    where
455        Self: Sized,
456        F: FnMut(Value) -> Value + 'static + Send,
457    {
458        match self {
459            PipelineData::Value(value, metadata) => {
460                let span = value.span();
461                let pipeline = match value {
462                    Value::List { vals, .. } => vals
463                        .into_iter()
464                        .map(f)
465                        .into_pipeline_data(span, signals.clone()),
466                    Value::Range { val, .. } => val
467                        .into_range_iter(span, Signals::empty())
468                        .map(f)
469                        .into_pipeline_data(span, signals.clone()),
470                    value => match f(value) {
471                        Value::Error { error, .. } => return Err(*error),
472                        v => v.into_pipeline_data(),
473                    },
474                };
475                Ok(pipeline.set_metadata(metadata))
476            }
477            PipelineData::Empty => Ok(PipelineData::empty()),
478            PipelineData::ListStream(stream, metadata) => {
479                Ok(PipelineData::list_stream(stream.map(f), metadata))
480            }
481            PipelineData::ByteStream(stream, metadata) => {
482                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
483            }
484        }
485    }
486
487    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
488    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
489    where
490        Self: Sized,
491        U: IntoIterator<Item = Value> + 'static,
492        <U as IntoIterator>::IntoIter: 'static + Send,
493        F: FnMut(Value) -> U + 'static + Send,
494    {
495        match self {
496            PipelineData::Empty => Ok(PipelineData::empty()),
497            PipelineData::Value(value, metadata) => {
498                let span = value.span();
499                let pipeline = match value {
500                    Value::List { vals, .. } => vals
501                        .into_iter()
502                        .flat_map(f)
503                        .into_pipeline_data(span, signals.clone()),
504                    Value::Range { val, .. } => val
505                        .into_range_iter(span, Signals::empty())
506                        .flat_map(f)
507                        .into_pipeline_data(span, signals.clone()),
508                    value => f(value)
509                        .into_iter()
510                        .into_pipeline_data(span, signals.clone()),
511                };
512                Ok(pipeline.set_metadata(metadata))
513            }
514            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
515                stream.modify(|iter| iter.flat_map(f)),
516                metadata,
517            )),
518            PipelineData::ByteStream(stream, metadata) => {
519                // TODO: is this behavior desired / correct ?
520                let span = stream.span();
521                let iter = match String::from_utf8(stream.into_bytes()?) {
522                    Ok(mut str) => {
523                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
524                        f(Value::string(str, span))
525                    }
526                    Err(err) => f(Value::binary(err.into_bytes(), span)),
527                };
528                Ok(iter.into_iter().into_pipeline_data_with_metadata(
529                    span,
530                    signals.clone(),
531                    metadata,
532                ))
533            }
534        }
535    }
536
537    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
538    where
539        Self: Sized,
540        F: FnMut(&Value) -> bool + 'static + Send,
541    {
542        match self {
543            PipelineData::Empty => Ok(PipelineData::empty()),
544            PipelineData::Value(value, metadata) => {
545                let span = value.span();
546                let pipeline = match value {
547                    Value::List { vals, .. } => vals
548                        .into_iter()
549                        .filter(f)
550                        .into_pipeline_data(span, signals.clone()),
551                    Value::Range { val, .. } => val
552                        .into_range_iter(span, Signals::empty())
553                        .filter(f)
554                        .into_pipeline_data(span, signals.clone()),
555                    value => {
556                        if f(&value) {
557                            value.into_pipeline_data()
558                        } else {
559                            Value::nothing(span).into_pipeline_data()
560                        }
561                    }
562                };
563                Ok(pipeline.set_metadata(metadata))
564            }
565            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
566                stream.modify(|iter| iter.filter(f)),
567                metadata,
568            )),
569            PipelineData::ByteStream(stream, metadata) => {
570                // TODO: is this behavior desired / correct ?
571                let span = stream.span();
572                let value = match String::from_utf8(stream.into_bytes()?) {
573                    Ok(mut str) => {
574                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
575                        Value::string(str, span)
576                    }
577                    Err(err) => Value::binary(err.into_bytes(), span),
578                };
579                let value = if f(&value) {
580                    value
581                } else {
582                    Value::nothing(span)
583                };
584                Ok(value.into_pipeline_data_with_metadata(metadata))
585            }
586        }
587    }
588
589    /// Try to convert Value from Value::Range to Value::List.
590    /// This is useful to expand Value::Range into array notation, specifically when
591    /// converting `to json` or `to nuon`.
592    /// `1..3 | to XX -> [1,2,3]`
593    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
594        match self {
595            PipelineData::Value(v, metadata) => {
596                let span = v.span();
597                match v {
598                    Value::Range { val, .. } => {
599                        match *val {
600                            Range::IntRange(range) => {
601                                if range.is_unbounded() {
602                                    return Err(ShellError::GenericError {
603                                        error: "Cannot create range".into(),
604                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
605                                        span: Some(span),
606                                        help: Some("Consider using ranges with valid start and end point.".into()),
607                                        inner: vec![],
608                                    });
609                                }
610                            }
611                            Range::FloatRange(range) => {
612                                if range.is_unbounded() {
613                                    return Err(ShellError::GenericError {
614                                        error: "Cannot create range".into(),
615                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
616                                        span: Some(span),
617                                        help: Some("Consider using ranges with valid start and end point.".into()),
618                                        inner: vec![],
619                                    });
620                                }
621                            }
622                        }
623                        let range_values: Vec<Value> =
624                            val.into_range_iter(span, Signals::empty()).collect();
625                        Ok(PipelineData::value(Value::list(range_values, span), None))
626                    }
627                    x => Ok(PipelineData::value(x, metadata)),
628                }
629            }
630            _ => Ok(self),
631        }
632    }
633
634    /// Consume and print self data immediately, formatted using table command.
635    ///
636    /// This does not respect the display_output hook. If a value is being printed out by a command,
637    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
638    ///
639    /// `no_newline` controls if we need to attach newline character to output.
640    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
641    pub fn print_table(
642        self,
643        engine_state: &EngineState,
644        stack: &mut Stack,
645        no_newline: bool,
646        to_stderr: bool,
647    ) -> Result<(), ShellError> {
648        match self {
649            // Print byte streams directly as long as they aren't binary.
650            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
651                stream.print(to_stderr)
652            }
653            _ => {
654                // If the table function is in the declarations, then we can use it
655                // to create the table value that will be printed in the terminal
656                if let Some(decl_id) = engine_state.table_decl_id {
657                    let command = engine_state.get_decl(decl_id);
658                    if command.block_id().is_some() {
659                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
660                    } else {
661                        let call = Call::new(Span::new(0, 0));
662                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
663                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
664                    }
665                } else {
666                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
667                }
668            }
669        }
670    }
671
672    /// Consume and print self data without any extra formatting.
673    ///
674    /// This does not use the `table` command to format data, and also prints binary values and
675    /// streams in their raw format without generating a hexdump first.
676    ///
677    /// `no_newline` controls if we need to attach newline character to output.
678    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
679    pub fn print_raw(
680        self,
681        engine_state: &EngineState,
682        no_newline: bool,
683        to_stderr: bool,
684    ) -> Result<(), ShellError> {
685        let span = self.span();
686        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
687            if to_stderr {
688                write_all_and_flush(
689                    bytes,
690                    &mut std::io::stderr().lock(),
691                    "stderr",
692                    span,
693                    engine_state.signals(),
694                )?;
695            } else {
696                write_all_and_flush(
697                    bytes,
698                    &mut std::io::stdout().lock(),
699                    "stdout",
700                    span,
701                    engine_state.signals(),
702                )?;
703            }
704            Ok(())
705        } else {
706            self.write_all_and_flush(engine_state, no_newline, to_stderr)
707        }
708    }
709
710    fn write_all_and_flush(
711        self,
712        engine_state: &EngineState,
713        no_newline: bool,
714        to_stderr: bool,
715    ) -> Result<(), ShellError> {
716        let span = self.span();
717        if let PipelineData::ByteStream(stream, ..) = self {
718            // Copy ByteStreams directly
719            stream.print(to_stderr)
720        } else {
721            let config = engine_state.get_config();
722            for item in self {
723                let mut out = if let Value::Error { error, .. } = item {
724                    return Err(*error);
725                } else {
726                    item.to_expanded_string("\n", config)
727                };
728
729                if !no_newline {
730                    out.push('\n');
731                }
732
733                if to_stderr {
734                    write_all_and_flush(
735                        out,
736                        &mut std::io::stderr().lock(),
737                        "stderr",
738                        span,
739                        engine_state.signals(),
740                    )?;
741                } else {
742                    write_all_and_flush(
743                        out,
744                        &mut std::io::stdout().lock(),
745                        "stdout",
746                        span,
747                        engine_state.signals(),
748                    )?;
749                }
750            }
751
752            Ok(())
753        }
754    }
755
756    pub fn unsupported_input_error(
757        self,
758        expected_type: impl Into<String>,
759        span: Span,
760    ) -> ShellError {
761        match self {
762            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
763            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
764                exp_input_type: expected_type.into(),
765                wrong_type: value.get_type().get_non_specified_string(),
766                dst_span: span,
767                src_span: value.span(),
768            },
769            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
770                exp_input_type: expected_type.into(),
771                wrong_type: "list (stream)".into(),
772                dst_span: span,
773                src_span: stream.span(),
774            },
775            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
776                exp_input_type: expected_type.into(),
777                wrong_type: stream.type_().describe().into(),
778                dst_span: span,
779                src_span: stream.span(),
780            },
781        }
782    }
783
784    // PipelineData might connect to a running process which has an exit status future
785    // Use this method to retrieve that future, it's useful for implementing `pipefail` feature.
786    #[cfg(feature = "os")]
787    pub fn clone_exit_status_future(&self) -> Option<Arc<Mutex<ExitStatusFuture>>> {
788        match self {
789            PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
790            PipelineData::ByteStream(stream, ..) => match stream.source() {
791                ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
792                ByteStreamSource::Child(c) => Some(c.clone_exit_status_future()),
793            },
794        }
795    }
796}
797
798pub fn write_all_and_flush<T>(
799    data: T,
800    destination: &mut impl Write,
801    destination_name: &str,
802    span: Option<Span>,
803    signals: &Signals,
804) -> Result<(), ShellError>
805where
806    T: AsRef<[u8]>,
807{
808    let io_error_map = |err: std::io::Error, location: Location| {
809        let context = format!("Writing to {destination_name} failed");
810        match span {
811            None => IoError::new_internal(err, context, location),
812            Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
813            Some(span) => IoError::new_with_additional_context(err, span, None, context),
814        }
815    };
816
817    let span = span.unwrap_or(Span::unknown());
818    const OUTPUT_CHUNK_SIZE: usize = 8192;
819    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
820        signals.check(&span)?;
821        destination
822            .write_all(chunk)
823            .map_err(|err| io_error_map(err, location!()))?;
824    }
825    destination
826        .flush()
827        .map_err(|err| io_error_map(err, location!()))?;
828    Ok(())
829}
830
831enum PipelineIteratorInner {
832    Empty,
833    Value(Value),
834    ListStream(crate::list_stream::IntoIter),
835    ByteStream(crate::byte_stream::Chunks),
836}
837
838pub struct PipelineIterator(PipelineIteratorInner);
839
840impl IntoIterator for PipelineData {
841    type Item = Value;
842
843    type IntoIter = PipelineIterator;
844
845    fn into_iter(self) -> Self::IntoIter {
846        PipelineIterator(match self {
847            PipelineData::Empty => PipelineIteratorInner::Empty,
848            PipelineData::Value(value, ..) => {
849                let span = value.span();
850                match value {
851                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
852                        ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
853                    ),
854                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
855                        ListStream::new(
856                            val.into_range_iter(span, Signals::empty()),
857                            span,
858                            Signals::empty(),
859                        )
860                        .into_iter(),
861                    ),
862                    x => PipelineIteratorInner::Value(x),
863                }
864            }
865            PipelineData::ListStream(stream, ..) => {
866                PipelineIteratorInner::ListStream(stream.into_iter())
867            }
868            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
869                PipelineIteratorInner::Empty,
870                PipelineIteratorInner::ByteStream,
871            ),
872        })
873    }
874}
875
876impl Iterator for PipelineIterator {
877    type Item = Value;
878
879    fn next(&mut self) -> Option<Self::Item> {
880        match &mut self.0 {
881            PipelineIteratorInner::Empty => None,
882            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
883            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
884            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
885            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
886                Ok(x) => x,
887                Err(err) => Value::error(
888                    err,
889                    Span::unknown(), //FIXME: unclear where this span should come from
890                ),
891            }),
892        }
893    }
894}
895
896pub trait IntoPipelineData {
897    fn into_pipeline_data(self) -> PipelineData;
898
899    fn into_pipeline_data_with_metadata(
900        self,
901        metadata: impl Into<Option<PipelineMetadata>>,
902    ) -> PipelineData;
903}
904
905impl<V> IntoPipelineData for V
906where
907    V: Into<Value>,
908{
909    fn into_pipeline_data(self) -> PipelineData {
910        PipelineData::value(self.into(), None)
911    }
912
913    fn into_pipeline_data_with_metadata(
914        self,
915        metadata: impl Into<Option<PipelineMetadata>>,
916    ) -> PipelineData {
917        PipelineData::value(self.into(), metadata.into())
918    }
919}
920
921pub trait IntoInterruptiblePipelineData {
922    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
923    fn into_pipeline_data_with_metadata(
924        self,
925        span: Span,
926        signals: Signals,
927        metadata: impl Into<Option<PipelineMetadata>>,
928    ) -> PipelineData;
929}
930
931impl<I> IntoInterruptiblePipelineData for I
932where
933    I: IntoIterator + Send + 'static,
934    I::IntoIter: Send + 'static,
935    <I::IntoIter as Iterator>::Item: Into<Value>,
936{
937    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
938        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
939    }
940
941    fn into_pipeline_data_with_metadata(
942        self,
943        span: Span,
944        signals: Signals,
945        metadata: impl Into<Option<PipelineMetadata>>,
946    ) -> PipelineData {
947        PipelineData::list_stream(
948            ListStream::new(self.into_iter().map(Into::into), span, signals),
949            metadata.into(),
950        )
951    }
952}
953
954fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
955    let bytes = match value {
956        Value::String { val, .. } => val.into_bytes(),
957        Value::Binary { val, .. } => val,
958        Value::List { vals, .. } => {
959            let val = vals
960                .into_iter()
961                .map(Value::coerce_into_string)
962                .collect::<Result<Vec<String>, ShellError>>()?
963                .join("\n")
964                + "\n";
965
966            val.into_bytes()
967        }
968        // Propagate errors by explicitly matching them before the final case.
969        Value::Error { error, .. } => return Err(*error),
970        value => value.coerce_into_string()?.into_bytes(),
971    };
972    Ok(bytes)
973}
974
975/// A wrapper to [`PipelineData`] which can also track exit status.
976///
977/// We use exit status tracking to implement the `pipefail` feature.
978pub struct PipelineExecutionData {
979    pub body: PipelineData,
980    #[cfg(feature = "os")]
981    pub exit: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
982}
983
984impl Deref for PipelineExecutionData {
985    type Target = PipelineData;
986
987    fn deref(&self) -> &Self::Target {
988        &self.body
989    }
990}
991
992impl PipelineExecutionData {
993    pub fn empty() -> Self {
994        Self {
995            body: PipelineData::empty(),
996            #[cfg(feature = "os")]
997            exit: vec![],
998        }
999    }
1000}
1001
1002impl From<PipelineData> for PipelineExecutionData {
1003    #[cfg(feature = "os")]
1004    fn from(value: PipelineData) -> Self {
1005        let value_span = value.span().unwrap_or_else(Span::unknown);
1006        let exit_status_future = value.clone_exit_status_future().map(|f| (f, value_span));
1007        Self {
1008            body: value,
1009            exit: vec![exit_status_future],
1010        }
1011    }
1012
1013    #[cfg(not(feature = "os"))]
1014    fn from(value: PipelineData) -> Self {
1015        Self { body: value }
1016    }
1017}