Skip to main content

nu_protocol/pipeline/
pipeline_data.rs

1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4    ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5    Range, ShellError, Signals, Span, Type, Value,
6    ast::{Call, PathMember},
7    engine::{EngineState, Stack},
8    location,
9    shell_error::{io::IoError, location::Location},
10};
11use std::{borrow::Cow, io::Write, ops::Deref};
12
13const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
14
15/// The foundational abstraction for input and output to commands
16///
17/// This represents either a single Value or a stream of values coming into the command or leaving a command.
18///
19/// A note on implementation:
20///
21/// We've tried a few variations of this structure. Listing these below so we have a record.
22///
23/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
24///   Namely, how do you know the difference between a single string and a list of one string. How do you know
25///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
26///   list?
27///
28/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
29///   on values", but lead to a few unfortunate issues.
30///
31/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
32/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
33/// variable would see different values based on what you did to the first.
34///
35/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
36/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
37/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
38/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
39/// concrete list values rather than streams, and be able to view them without non-local effects.
40///
41/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
42///   them into any sources. Streams are still available to model the infinite streams approach of original
43///   Nushell.
44#[derive(Debug)]
45pub enum PipelineData {
46    Empty,
47    Value(Value, Option<PipelineMetadata>),
48    ListStream(ListStream, Option<PipelineMetadata>),
49    ByteStream(ByteStream, Option<PipelineMetadata>),
50}
51
52impl PipelineData {
53    pub const fn empty() -> PipelineData {
54        PipelineData::Empty
55    }
56
57    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
58        PipelineData::Value(val, metadata.into())
59    }
60
61    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62        PipelineData::ListStream(stream, metadata.into())
63    }
64
65    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66        PipelineData::ByteStream(stream, metadata.into())
67    }
68
69    pub fn metadata(&self) -> Option<PipelineMetadata> {
70        match self {
71            PipelineData::Empty => None,
72            PipelineData::Value(_, meta)
73            | PipelineData::ListStream(_, meta)
74            | PipelineData::ByteStream(_, meta) => meta.clone(),
75        }
76    }
77
78    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
79        match &mut self {
80            PipelineData::Empty => {}
81            PipelineData::Value(_, meta)
82            | PipelineData::ListStream(_, meta)
83            | PipelineData::ByteStream(_, meta) => *meta = metadata,
84        }
85        self
86    }
87
88    pub fn is_nothing(&self) -> bool {
89        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
90            || matches!(self, PipelineData::Empty)
91    }
92
93    /// PipelineData doesn't always have a Span, but we can try!
94    pub fn span(&self) -> Option<Span> {
95        match self {
96            PipelineData::Empty => None,
97            PipelineData::Value(value, ..) => Some(value.span()),
98            PipelineData::ListStream(stream, ..) => Some(stream.span()),
99            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
100        }
101    }
102
103    /// Change the span of the [`PipelineData`].
104    ///
105    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
106    pub fn with_span(self, span: Span) -> Self {
107        match self {
108            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
109            PipelineData::Value(value, metadata) => {
110                PipelineData::value(value.with_span(span), metadata)
111            }
112            PipelineData::ListStream(stream, metadata) => {
113                PipelineData::list_stream(stream.with_span(span), metadata)
114            }
115            PipelineData::ByteStream(stream, metadata) => {
116                PipelineData::byte_stream(stream.with_span(span), metadata)
117            }
118        }
119    }
120
121    /// Get a type that is representative of the `PipelineData`.
122    ///
123    /// The type returned here makes no effort to collect a stream, so it may be a different type
124    /// than would be returned by [`Value::get_type()`] on the result of
125    /// [`.into_value()`](Self::into_value).
126    ///
127    /// Specifically, a `ListStream` results in `list<any>` rather than
128    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
129    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
130    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
131    pub fn get_type(&self) -> Type {
132        match self {
133            PipelineData::Empty => Type::Nothing,
134            PipelineData::Value(value, _) => value.get_type(),
135            PipelineData::ListStream(_, _) => Type::list(Type::Any),
136            PipelineData::ByteStream(stream, _) => stream.type_().into(),
137        }
138    }
139
140    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
141    ///
142    /// This check makes no effort to collect a stream, so it may be a different result
143    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
144    /// [`.into_value()`](Self::into_value).
145    ///
146    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
147    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
148    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
149    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
150    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
151    ///
152    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
153    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
154    pub fn is_subtype_of(&self, other: &Type) -> bool {
155        match (self, other) {
156            (_, Type::Any) => true,
157            (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
158            (PipelineData::Empty, Type::Nothing) => true,
159            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
160
161            // a list stream could be a list with any type, including a table
162            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
163
164            (PipelineData::ByteStream(stream, ..), Type::String)
165                if stream.type_().is_string_coercible() =>
166            {
167                true
168            }
169            (PipelineData::ByteStream(stream, ..), Type::Binary)
170                if stream.type_().is_binary_coercible() =>
171            {
172                true
173            }
174
175            (PipelineData::Empty, _) => false,
176            (PipelineData::ListStream(..), _) => false,
177            (PipelineData::ByteStream(..), _) => false,
178        }
179    }
180
181    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
182        match self {
183            PipelineData::Empty => Ok(Value::nothing(span)),
184            PipelineData::Value(value, ..) => {
185                if value.span() == Span::unknown() {
186                    Ok(value.with_span(span))
187                } else {
188                    Ok(value)
189                }
190            }
191            PipelineData::ListStream(stream, ..) => stream.into_value(),
192            PipelineData::ByteStream(stream, ..) => stream.into_value(),
193        }
194    }
195
196    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
197    ///
198    /// This means that lists and ranges are converted into list streams, and strings and binary are
199    /// converted into byte streams.
200    ///
201    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
202    /// variant. If the variant is already a stream variant, it is returned as-is.
203    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
204        let span = self.span().unwrap_or(Span::unknown());
205        match self {
206            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
207            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
208                let metadata = metadata.clone();
209                Ok(PipelineData::list_stream(
210                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
211                    metadata,
212                ))
213            }
214            PipelineData::Value(Value::String { val, .. }, metadata) => {
215                Ok(PipelineData::byte_stream(
216                    ByteStream::read_string(val, span, engine_state.signals().clone()),
217                    metadata,
218                ))
219            }
220            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
221                Ok(PipelineData::byte_stream(
222                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
223                    metadata,
224                ))
225            }
226            PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
227                match val.to_base_value(internal_span) {
228                    Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
229                        ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
230                        metadata,
231                    )),
232                    Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
233                        ListStream::new(
234                            val.into_range_iter(span, Signals::empty()),
235                            span,
236                            engine_state.signals().clone(),
237                        ),
238                        metadata,
239                    )),
240                    Ok(other) => Err(PipelineData::value(other, metadata)),
241                    Err(_) => Err(PipelineData::Value(
242                        Value::Custom { val, internal_span },
243                        metadata,
244                    )),
245                }
246            }
247            _ => Err(self),
248        }
249    }
250
251    /// Drain and write this [`PipelineData`] to `dest`.
252    ///
253    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
254    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
255        match self {
256            PipelineData::Empty => Ok(()),
257            PipelineData::Value(value, ..) => {
258                let bytes = value_to_bytes(value)?;
259                dest.write_all(&bytes).map_err(|err| {
260                    IoError::new_internal(
261                        err,
262                        "Could not write PipelineData to dest",
263                        crate::location!(),
264                    )
265                })?;
266                dest.flush().map_err(|err| {
267                    IoError::new_internal(
268                        err,
269                        "Could not flush PipelineData to dest",
270                        crate::location!(),
271                    )
272                })?;
273                Ok(())
274            }
275            PipelineData::ListStream(stream, ..) => {
276                for value in stream {
277                    let bytes = value_to_bytes(value)?;
278                    dest.write_all(&bytes).map_err(|err| {
279                        IoError::new_internal(
280                            err,
281                            "Could not write PipelineData to dest",
282                            crate::location!(),
283                        )
284                    })?;
285                    dest.write_all(b"\n").map_err(|err| {
286                        IoError::new_internal(
287                            err,
288                            "Could not write linebreak after PipelineData to dest",
289                            crate::location!(),
290                        )
291                    })?;
292                }
293                dest.flush().map_err(|err| {
294                    IoError::new_internal(
295                        err,
296                        "Could not flush PipelineData to dest",
297                        crate::location!(),
298                    )
299                })?;
300                Ok(())
301            }
302            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
303        }
304    }
305
306    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
307    ///
308    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
309    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
310    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
311    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
312    pub fn drain_to_out_dests(
313        self,
314        engine_state: &EngineState,
315        stack: &mut Stack,
316    ) -> Result<Self, ShellError> {
317        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
318            OutDest::Print => {
319                self.print_table(engine_state, stack, false, false)?;
320                Ok(Self::Empty)
321            }
322            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
323            OutDest::Value => {
324                let metadata = self.metadata();
325                let span = self.span().unwrap_or(Span::unknown());
326                self.into_value(span).map(|val| Self::Value(val, metadata))
327            }
328            OutDest::File(file) => {
329                self.write_to(file.as_ref())?;
330                Ok(Self::Empty)
331            }
332            OutDest::Null | OutDest::Inherit => {
333                self.drain()?;
334                Ok(Self::Empty)
335            }
336        }
337    }
338
339    pub fn drain(self) -> Result<(), ShellError> {
340        match self {
341            Self::Empty => Ok(()),
342            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
343            Self::Value(..) => Ok(()),
344            Self::ListStream(stream, ..) => stream.drain(),
345            Self::ByteStream(stream, ..) => stream.drain(),
346        }
347    }
348
349    /// Try convert from self into iterator
350    ///
351    /// It returns Err if the `self` cannot be converted to an iterator.
352    ///
353    /// The `span` should be the span of the command or operation that would raise an error.
354    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
355        Ok(PipelineIterator(match self {
356            PipelineData::Value(value, ..) => {
357                let val_span = value.span();
358                match value {
359                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
360                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
361                    ),
362                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
363                        ListStream::new(
364                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
365                            val_span,
366                            Signals::empty(),
367                        )
368                        .into_iter(),
369                    ),
370                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
371                        ListStream::new(
372                            val.into_range_iter(val_span, Signals::empty()),
373                            val_span,
374                            Signals::empty(),
375                        )
376                        .into_iter(),
377                    ),
378                    // Handle iterable custom values by converting to base value first
379                    Value::Custom { ref val, .. } if val.is_iterable() => {
380                        match val.to_base_value(val_span) {
381                            Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
382                                ListStream::new(vals.into_iter(), val_span, Signals::empty())
383                                    .into_iter(),
384                            ),
385                            Ok(other) => {
386                                return Err(ShellError::OnlySupportsThisInputType {
387                                    exp_input_type: "list, binary, range, or byte stream".into(),
388                                    wrong_type: other.get_type().to_string(),
389                                    dst_span: span,
390                                    src_span: val_span,
391                                });
392                            }
393                            Err(err) => return Err(err),
394                        }
395                    }
396                    // Propagate errors by explicitly matching them before the final case.
397                    Value::Error { error, .. } => return Err(*error),
398                    other => {
399                        return Err(ShellError::OnlySupportsThisInputType {
400                            exp_input_type: "list, binary, range, or byte stream".into(),
401                            wrong_type: other.get_type().to_string(),
402                            dst_span: span,
403                            src_span: val_span,
404                        });
405                    }
406                }
407            }
408            PipelineData::ListStream(stream, ..) => {
409                PipelineIteratorInner::ListStream(stream.into_iter())
410            }
411            PipelineData::Empty => {
412                return Err(ShellError::OnlySupportsThisInputType {
413                    exp_input_type: "list, binary, range, or byte stream".into(),
414                    wrong_type: "null".into(),
415                    dst_span: span,
416                    src_span: span,
417                });
418            }
419            PipelineData::ByteStream(stream, ..) => {
420                if let Some(chunks) = stream.chunks() {
421                    PipelineIteratorInner::ByteStream(chunks)
422                } else {
423                    PipelineIteratorInner::Empty
424                }
425            }
426        }))
427    }
428
429    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
430        match self {
431            PipelineData::Empty => Ok(String::new()),
432            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
433            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
434            PipelineData::ByteStream(stream, ..) => stream.into_string(),
435        }
436    }
437
438    /// Retrieves string from pipeline data.
439    ///
440    /// As opposed to `collect_string` this raises error rather than converting non-string values.
441    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
442    pub fn collect_string_strict(
443        self,
444        span: Span,
445    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
446        match self {
447            PipelineData::Empty => Ok((String::new(), span, None)),
448            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
449            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
450                err_message: "string".into(),
451                span: val.span(),
452            }),
453            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
454                err_message: "string".into(),
455                span,
456            }),
457            PipelineData::ByteStream(stream, metadata) => {
458                let span = stream.span();
459                Ok((stream.into_string()?, span, metadata))
460            }
461        }
462    }
463
464    pub fn follow_cell_path(
465        self,
466        cell_path: &[PathMember],
467        head: Span,
468    ) -> Result<Value, ShellError> {
469        match self {
470            // FIXME: there are probably better ways of doing this
471            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
472                .follow_cell_path(cell_path)
473                .map(Cow::into_owned),
474            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
475            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
476                type_name: "empty pipeline".to_string(),
477                span: head,
478            }),
479            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
480                type_name: stream.type_().describe().to_owned(),
481                span: stream.span(),
482            }),
483        }
484    }
485
486    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
487    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
488    where
489        Self: Sized,
490        F: FnMut(Value) -> Value + 'static + Send,
491    {
492        match self {
493            PipelineData::Value(value, metadata) => {
494                let span = value.span();
495                let pipeline = match value {
496                    Value::List { vals, .. } => vals
497                        .into_iter()
498                        .map(f)
499                        .into_pipeline_data(span, signals.clone()),
500                    Value::Range { val, .. } => val
501                        .into_range_iter(span, Signals::empty())
502                        .map(f)
503                        .into_pipeline_data(span, signals.clone()),
504                    Value::Custom { ref val, .. } if val.is_iterable() => {
505                        match val.to_base_value(span)? {
506                            Value::List { vals, .. } => vals
507                                .into_iter()
508                                .map(f)
509                                .into_pipeline_data(span, signals.clone()),
510                            Value::Range { val, .. } => val
511                                .into_range_iter(span, Signals::empty())
512                                .map(f)
513                                .into_pipeline_data(span, signals.clone()),
514                            value => match f(value) {
515                                Value::Error { error, .. } => return Err(*error),
516                                v => v.into_pipeline_data(),
517                            },
518                        }
519                    }
520                    value => match f(value) {
521                        Value::Error { error, .. } => return Err(*error),
522                        v => v.into_pipeline_data(),
523                    },
524                };
525                Ok(pipeline.set_metadata(metadata))
526            }
527            PipelineData::Empty => Ok(PipelineData::empty()),
528            PipelineData::ListStream(stream, metadata) => {
529                Ok(PipelineData::list_stream(stream.map(f), metadata))
530            }
531            PipelineData::ByteStream(stream, metadata) => {
532                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
533            }
534        }
535    }
536
537    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
538    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
539    where
540        Self: Sized,
541        U: IntoIterator<Item = Value> + 'static,
542        <U as IntoIterator>::IntoIter: 'static + Send,
543        F: FnMut(Value) -> U + 'static + Send,
544    {
545        match self {
546            PipelineData::Empty => Ok(PipelineData::empty()),
547            PipelineData::Value(value, metadata) => {
548                let span = value.span();
549                let pipeline = match value {
550                    Value::List { vals, .. } => vals
551                        .into_iter()
552                        .flat_map(f)
553                        .into_pipeline_data(span, signals.clone()),
554                    Value::Range { val, .. } => val
555                        .into_range_iter(span, Signals::empty())
556                        .flat_map(f)
557                        .into_pipeline_data(span, signals.clone()),
558                    Value::Custom { ref val, .. } if val.is_iterable() => {
559                        match val.to_base_value(span)? {
560                            Value::List { vals, .. } => vals
561                                .into_iter()
562                                .flat_map(f)
563                                .into_pipeline_data(span, signals.clone()),
564                            Value::Range { val, .. } => val
565                                .into_range_iter(span, Signals::empty())
566                                .flat_map(f)
567                                .into_pipeline_data(span, signals.clone()),
568                            value => f(value)
569                                .into_iter()
570                                .into_pipeline_data(span, signals.clone()),
571                        }
572                    }
573                    value => f(value)
574                        .into_iter()
575                        .into_pipeline_data(span, signals.clone()),
576                };
577                Ok(pipeline.set_metadata(metadata))
578            }
579            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
580                stream.modify(|iter| iter.flat_map(f)),
581                metadata,
582            )),
583            PipelineData::ByteStream(stream, metadata) => {
584                // TODO: is this behavior desired / correct ?
585                let span = stream.span();
586                let iter = match String::from_utf8(stream.into_bytes()?) {
587                    Ok(mut str) => {
588                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
589                        f(Value::string(str, span))
590                    }
591                    Err(err) => f(Value::binary(err.into_bytes(), span)),
592                };
593                Ok(iter.into_iter().into_pipeline_data_with_metadata(
594                    span,
595                    signals.clone(),
596                    metadata,
597                ))
598            }
599        }
600    }
601
602    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
603    where
604        Self: Sized,
605        F: FnMut(&Value) -> bool + 'static + Send,
606    {
607        match self {
608            PipelineData::Empty => Ok(PipelineData::empty()),
609            PipelineData::Value(value, metadata) => {
610                let span = value.span();
611                let pipeline = match value {
612                    Value::List { vals, .. } => vals
613                        .into_iter()
614                        .filter(f)
615                        .into_pipeline_data(span, signals.clone()),
616                    Value::Range { val, .. } => val
617                        .into_range_iter(span, Signals::empty())
618                        .filter(f)
619                        .into_pipeline_data(span, signals.clone()),
620                    Value::Custom { ref val, .. } if val.is_iterable() => {
621                        match val.to_base_value(span)? {
622                            Value::List { vals, .. } => vals
623                                .into_iter()
624                                .filter(f)
625                                .into_pipeline_data(span, signals.clone()),
626                            Value::Range { val, .. } => val
627                                .into_range_iter(span, Signals::empty())
628                                .filter(f)
629                                .into_pipeline_data(span, signals.clone()),
630                            value => {
631                                if f(&value) {
632                                    value.into_pipeline_data()
633                                } else {
634                                    Value::nothing(span).into_pipeline_data()
635                                }
636                            }
637                        }
638                    }
639                    value => {
640                        if f(&value) {
641                            value.into_pipeline_data()
642                        } else {
643                            Value::nothing(span).into_pipeline_data()
644                        }
645                    }
646                };
647                Ok(pipeline.set_metadata(metadata))
648            }
649            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
650                stream.modify(|iter| iter.filter(f)),
651                metadata,
652            )),
653            PipelineData::ByteStream(stream, metadata) => {
654                // TODO: is this behavior desired / correct ?
655                let span = stream.span();
656                let value = match String::from_utf8(stream.into_bytes()?) {
657                    Ok(mut str) => {
658                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
659                        Value::string(str, span)
660                    }
661                    Err(err) => Value::binary(err.into_bytes(), span),
662                };
663                let value = if f(&value) {
664                    value
665                } else {
666                    Value::nothing(span)
667                };
668                Ok(value.into_pipeline_data_with_metadata(metadata))
669            }
670        }
671    }
672
673    /// Try to convert Value from Value::Range to Value::List.
674    /// This is useful to expand Value::Range into array notation, specifically when
675    /// converting `to json` or `to nuon`.
676    /// `1..3 | to XX -> [1,2,3]`
677    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
678        match self {
679            PipelineData::Value(v, metadata) => {
680                let span = v.span();
681                match v {
682                    Value::Range { val, .. } => {
683                        match *val {
684                            Range::IntRange(range) => {
685                                if range.is_unbounded() {
686                                    return Err(ShellError::GenericError {
687                                        error: "Cannot create range".into(),
688                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
689                                        span: Some(span),
690                                        help: Some("Consider using ranges with valid start and end point.".into()),
691                                        inner: vec![],
692                                    });
693                                }
694                            }
695                            Range::FloatRange(range) => {
696                                if range.is_unbounded() {
697                                    return Err(ShellError::GenericError {
698                                        error: "Cannot create range".into(),
699                                        msg: "Unbounded ranges are not allowed when converting to this format".into(),
700                                        span: Some(span),
701                                        help: Some("Consider using ranges with valid start and end point.".into()),
702                                        inner: vec![],
703                                    });
704                                }
705                            }
706                        }
707                        let range_values: Vec<Value> =
708                            val.into_range_iter(span, Signals::empty()).collect();
709                        Ok(PipelineData::value(Value::list(range_values, span), None))
710                    }
711                    x => Ok(PipelineData::value(x, metadata)),
712                }
713            }
714            _ => Ok(self),
715        }
716    }
717
718    /// Consume and print self data immediately, formatted using table command.
719    ///
720    /// This does not respect the display_output hook. If a value is being printed out by a command,
721    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
722    ///
723    /// `no_newline` controls if we need to attach newline character to output.
724    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
725    pub fn print_table(
726        self,
727        engine_state: &EngineState,
728        stack: &mut Stack,
729        no_newline: bool,
730        to_stderr: bool,
731    ) -> Result<(), ShellError> {
732        match self {
733            // Print byte streams directly as long as they aren't binary.
734            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
735                stream.print(to_stderr)
736            }
737            _ => {
738                // If the table function is in the declarations, then we can use it
739                // to create the table value that will be printed in the terminal
740                if let Some(decl_id) = engine_state.table_decl_id {
741                    let command = engine_state.get_decl(decl_id);
742                    if command.block_id().is_some() {
743                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
744                    } else {
745                        let call = Call::new(Span::new(0, 0));
746                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
747                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
748                    }
749                } else {
750                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
751                }
752            }
753        }
754    }
755
756    /// Consume and print self data without any extra formatting.
757    ///
758    /// This does not use the `table` command to format data, and also prints binary values and
759    /// streams in their raw format without generating a hexdump first.
760    ///
761    /// `no_newline` controls if we need to attach newline character to output.
762    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
763    pub fn print_raw(
764        self,
765        engine_state: &EngineState,
766        no_newline: bool,
767        to_stderr: bool,
768    ) -> Result<(), ShellError> {
769        let span = self.span();
770        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
771            if to_stderr {
772                write_all_and_flush(
773                    bytes,
774                    &mut std::io::stderr().lock(),
775                    "stderr",
776                    span,
777                    engine_state.signals(),
778                )?;
779            } else {
780                write_all_and_flush(
781                    bytes,
782                    &mut std::io::stdout().lock(),
783                    "stdout",
784                    span,
785                    engine_state.signals(),
786                )?;
787            }
788            Ok(())
789        } else {
790            self.write_all_and_flush(engine_state, no_newline, to_stderr)
791        }
792    }
793
794    fn write_all_and_flush(
795        self,
796        engine_state: &EngineState,
797        no_newline: bool,
798        to_stderr: bool,
799    ) -> Result<(), ShellError> {
800        let span = self.span();
801        if let PipelineData::ByteStream(stream, ..) = self {
802            // Copy ByteStreams directly
803            stream.print(to_stderr)
804        } else {
805            let config = engine_state.get_config();
806            for item in self {
807                let mut out = if let Value::Error { error, .. } = item {
808                    return Err(*error);
809                } else {
810                    item.to_expanded_string("\n", config)
811                };
812
813                if !no_newline {
814                    out.push('\n');
815                }
816
817                if to_stderr {
818                    write_all_and_flush(
819                        out,
820                        &mut std::io::stderr().lock(),
821                        "stderr",
822                        span,
823                        engine_state.signals(),
824                    )?;
825                } else {
826                    write_all_and_flush(
827                        out,
828                        &mut std::io::stdout().lock(),
829                        "stdout",
830                        span,
831                        engine_state.signals(),
832                    )?;
833                }
834            }
835
836            Ok(())
837        }
838    }
839
840    pub fn unsupported_input_error(
841        self,
842        expected_type: impl Into<String>,
843        span: Span,
844    ) -> ShellError {
845        match self {
846            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
847            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
848                exp_input_type: expected_type.into(),
849                wrong_type: value.get_type().get_non_specified_string(),
850                dst_span: span,
851                src_span: value.span(),
852            },
853            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
854                exp_input_type: expected_type.into(),
855                wrong_type: "list (stream)".into(),
856                dst_span: span,
857                src_span: stream.span(),
858            },
859            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
860                exp_input_type: expected_type.into(),
861                wrong_type: stream.type_().describe().into(),
862                dst_span: span,
863                src_span: stream.span(),
864            },
865        }
866    }
867
868    // PipelineData might connect to a running process which has an exit status future
869    // Use this method to retrieve that future, it's useful for implementing `pipefail` feature.
870    #[cfg(feature = "os")]
871    pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
872        match self {
873            PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
874            PipelineData::ByteStream(stream, ..) => match stream.source() {
875                ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
876                ByteStreamSource::Child(c) => {
877                    let exit_future = c.clone_exit_status_future();
878                    let ignore_error = c.clone_ignore_error();
879                    Some(ExitStatusGuard::new(exit_future, ignore_error))
880                }
881            },
882        }
883    }
884}
885
886pub fn write_all_and_flush<T>(
887    data: T,
888    destination: &mut impl Write,
889    destination_name: &str,
890    span: Option<Span>,
891    signals: &Signals,
892) -> Result<(), ShellError>
893where
894    T: AsRef<[u8]>,
895{
896    let io_error_map = |err: std::io::Error, location: Location| {
897        let context = format!("Writing to {destination_name} failed");
898        match span {
899            None => IoError::new_internal(err, context, location),
900            Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
901            Some(span) => IoError::new_with_additional_context(err, span, None, context),
902        }
903    };
904
905    let span = span.unwrap_or(Span::unknown());
906    const OUTPUT_CHUNK_SIZE: usize = 8192;
907    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
908        signals.check(&span)?;
909        destination
910            .write_all(chunk)
911            .map_err(|err| io_error_map(err, location!()))?;
912    }
913    destination
914        .flush()
915        .map_err(|err| io_error_map(err, location!()))?;
916    Ok(())
917}
918
919enum PipelineIteratorInner {
920    Empty,
921    Value(Value),
922    ListStream(crate::list_stream::IntoIter),
923    ByteStream(crate::byte_stream::Chunks),
924}
925
926pub struct PipelineIterator(PipelineIteratorInner);
927
928impl IntoIterator for PipelineData {
929    type Item = Value;
930
931    type IntoIter = PipelineIterator;
932
933    fn into_iter(self) -> Self::IntoIter {
934        PipelineIterator(match self {
935            PipelineData::Empty => PipelineIteratorInner::Empty,
936            PipelineData::Value(value, ..) => {
937                let span = value.span();
938                match value {
939                    Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
940                        ListStream::new(
941                            vals.into_iter(),
942                            span,
943                            signals.unwrap_or_else(Signals::empty),
944                        )
945                        .into_iter(),
946                    ),
947                    Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
948                        ListStream::new(
949                            val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
950                            span,
951                            Signals::empty(),
952                        )
953                        .into_iter(),
954                    ),
955                    // Handle iterable custom values by converting to base value first
956                    Value::Custom { ref val, .. } if val.is_iterable() => {
957                        match val.to_base_value(span) {
958                            Ok(Value::List { vals, signals, .. }) => {
959                                PipelineIteratorInner::ListStream(
960                                    ListStream::new(
961                                        vals.into_iter(),
962                                        span,
963                                        signals.unwrap_or_else(Signals::empty),
964                                    )
965                                    .into_iter(),
966                                )
967                            }
968                            Ok(other) => PipelineIteratorInner::Value(other),
969                            Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
970                        }
971                    }
972                    x => PipelineIteratorInner::Value(x),
973                }
974            }
975            PipelineData::ListStream(stream, ..) => {
976                PipelineIteratorInner::ListStream(stream.into_iter())
977            }
978            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
979                PipelineIteratorInner::Empty,
980                PipelineIteratorInner::ByteStream,
981            ),
982        })
983    }
984}
985
986impl Iterator for PipelineIterator {
987    type Item = Value;
988
989    fn next(&mut self) -> Option<Self::Item> {
990        match &mut self.0 {
991            PipelineIteratorInner::Empty => None,
992            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
993            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
994            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
995            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
996                Ok(x) => x,
997                Err(err) => Value::error(
998                    err,
999                    Span::unknown(), //FIXME: unclear where this span should come from
1000                ),
1001            }),
1002        }
1003    }
1004}
1005
1006pub trait IntoPipelineData {
1007    fn into_pipeline_data(self) -> PipelineData;
1008
1009    fn into_pipeline_data_with_metadata(
1010        self,
1011        metadata: impl Into<Option<PipelineMetadata>>,
1012    ) -> PipelineData;
1013}
1014
1015impl<V> IntoPipelineData for V
1016where
1017    V: Into<Value>,
1018{
1019    fn into_pipeline_data(self) -> PipelineData {
1020        PipelineData::value(self.into(), None)
1021    }
1022
1023    fn into_pipeline_data_with_metadata(
1024        self,
1025        metadata: impl Into<Option<PipelineMetadata>>,
1026    ) -> PipelineData {
1027        PipelineData::value(self.into(), metadata.into())
1028    }
1029}
1030
1031pub trait IntoInterruptiblePipelineData {
1032    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1033    fn into_pipeline_data_with_metadata(
1034        self,
1035        span: Span,
1036        signals: Signals,
1037        metadata: impl Into<Option<PipelineMetadata>>,
1038    ) -> PipelineData;
1039}
1040
1041impl<I> IntoInterruptiblePipelineData for I
1042where
1043    I: IntoIterator + Send + 'static,
1044    I::IntoIter: Send + 'static,
1045    <I::IntoIter as Iterator>::Item: Into<Value>,
1046{
1047    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1048        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1049    }
1050
1051    fn into_pipeline_data_with_metadata(
1052        self,
1053        span: Span,
1054        signals: Signals,
1055        metadata: impl Into<Option<PipelineMetadata>>,
1056    ) -> PipelineData {
1057        PipelineData::list_stream(
1058            ListStream::new(self.into_iter().map(Into::into), span, signals),
1059            metadata.into(),
1060        )
1061    }
1062}
1063
1064fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1065    let bytes = match value {
1066        Value::String { val, .. } => val.into_bytes(),
1067        Value::Binary { val, .. } => val,
1068        Value::List { vals, .. } => {
1069            let val = vals
1070                .into_iter()
1071                .map(Value::coerce_into_string)
1072                .collect::<Result<Vec<String>, ShellError>>()?
1073                .join("\n")
1074                + "\n";
1075
1076            val.into_bytes()
1077        }
1078        // Propagate errors by explicitly matching them before the final case.
1079        Value::Error { error, .. } => return Err(*error),
1080        value => value.coerce_into_string()?.into_bytes(),
1081    };
1082    Ok(bytes)
1083}
1084
1085/// A wrapper to [`PipelineData`] which can also track exit status.
1086///
1087/// We use exit status tracking to implement the `pipefail` feature.
1088pub struct PipelineExecutionData {
1089    pub body: PipelineData,
1090    #[cfg(feature = "os")]
1091    pub exit: Vec<Option<ExitStatusGuard>>,
1092}
1093
1094impl Deref for PipelineExecutionData {
1095    type Target = PipelineData;
1096
1097    fn deref(&self) -> &Self::Target {
1098        &self.body
1099    }
1100}
1101
1102impl PipelineExecutionData {
1103    pub fn empty() -> Self {
1104        Self {
1105            body: PipelineData::empty(),
1106            #[cfg(feature = "os")]
1107            exit: vec![],
1108        }
1109    }
1110}
1111
1112impl From<PipelineData> for PipelineExecutionData {
1113    #[cfg(feature = "os")]
1114    fn from(value: PipelineData) -> Self {
1115        let value_span = value.span().unwrap_or_else(Span::unknown);
1116        let exit_status_future = value
1117            .clone_exit_status_future()
1118            .map(|f| f.with_span(value_span));
1119        Self {
1120            body: value,
1121            exit: vec![exit_status_future],
1122        }
1123    }
1124
1125    #[cfg(not(feature = "os"))]
1126    fn from(value: PipelineData) -> Self {
1127        Self { body: value }
1128    }
1129}