Skip to main content

nu_protocol/pipeline/
pipeline_data.rs

1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4    ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5    Range, ShellError, Signals, Span, Type, Value,
6    ast::{Call, PathMember},
7    engine::{EngineState, Stack},
8    shell_error::{generic::GenericError, io::IoError},
9};
10use std::{
11    borrow::Cow,
12    io::Write,
13    ops::{Deref, DerefMut},
14    panic::Location,
15};
16
17const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
18
19/// The foundational abstraction for input and output to commands
20///
21/// This represents either a single Value or a stream of values coming into the command or leaving a command.
22///
23/// A note on implementation:
24///
25/// We've tried a few variations of this structure. Listing these below so we have a record.
26///
27/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
28///   Namely, how do you know the difference between a single string and a list of one string. How do you know
29///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
30///   list?
31///
32/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
33///   on values", but lead to a few unfortunate issues.
34///
35/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
36/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
37/// variable would see different values based on what you did to the first.
38///
39/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
40/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
41/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
42/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
43/// concrete list values rather than streams, and be able to view them without non-local effects.
44///
45/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
46///   them into any sources. Streams are still available to model the infinite streams approach of original
47///   Nushell.
48#[derive(Debug)]
49pub enum PipelineData {
50    Empty,
51    Value(Value, Option<PipelineMetadata>),
52    ListStream(ListStream, Option<PipelineMetadata>),
53    ByteStream(ByteStream, Option<PipelineMetadata>),
54}
55
56impl PipelineData {
57    pub const fn empty() -> PipelineData {
58        PipelineData::Empty
59    }
60
61    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62        PipelineData::Value(val, metadata.into())
63    }
64
65    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66        PipelineData::ListStream(stream, metadata.into())
67    }
68
69    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
70        PipelineData::ByteStream(stream, metadata.into())
71    }
72
73    /// Returns a clone of the metadata if it exists.
74    ///
75    /// Note: This performs a deep clone of heap-allocated structures.
76    /// Use [`.metadata_ref()`](Self::metadata_ref), [`.metadata_mut()`](Self::metadata_mut)
77    /// or [`.take_metadata()`](Self::take_metadata) to avoid unnecessary allocations.
78    #[deprecated(
79        since = "0.111.1",
80        note = "Use .metadata_ref(), .metadata_mut() or .take_metadata() instead"
81    )]
82    pub fn metadata(&self) -> Option<PipelineMetadata> {
83        self.metadata_ref().cloned()
84    }
85
86    /// Returns a reference to the metadata if it exists.
87    pub fn metadata_ref(&self) -> Option<&PipelineMetadata> {
88        match self {
89            PipelineData::Empty => None,
90            PipelineData::Value(_, meta)
91            | PipelineData::ListStream(_, meta)
92            | PipelineData::ByteStream(_, meta) => meta.as_ref(),
93        }
94    }
95
96    /// Returns a mutable reference to the metadata if it exists.
97    pub fn metadata_mut(&mut self) -> Option<&mut PipelineMetadata> {
98        match self {
99            PipelineData::Empty => None,
100            PipelineData::Value(_, meta)
101            | PipelineData::ListStream(_, meta)
102            | PipelineData::ByteStream(_, meta) => meta.as_mut(),
103        }
104    }
105
106    /// Take the metadata out of pipeline if it exists.
107    pub fn take_metadata(&mut self) -> Option<PipelineMetadata> {
108        match self {
109            PipelineData::Empty => None,
110            PipelineData::Value(_, meta)
111            | PipelineData::ListStream(_, meta)
112            | PipelineData::ByteStream(_, meta) => meta.take(),
113        }
114    }
115
116    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
117        match &mut self {
118            PipelineData::Empty => {}
119            PipelineData::Value(_, meta)
120            | PipelineData::ListStream(_, meta)
121            | PipelineData::ByteStream(_, meta) => *meta = metadata,
122        }
123        self
124    }
125
126    pub fn is_nothing(&self) -> bool {
127        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
128            || matches!(self, PipelineData::Empty)
129    }
130
131    /// PipelineData doesn't always have a Span, but we can try!
132    pub fn span(&self) -> Option<Span> {
133        match self {
134            PipelineData::Empty => None,
135            PipelineData::Value(value, ..) => Some(value.span()),
136            PipelineData::ListStream(stream, ..) => Some(stream.span()),
137            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
138        }
139    }
140
141    /// Change the span of the [`PipelineData`].
142    ///
143    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
144    pub fn with_span(self, span: Span) -> Self {
145        match self {
146            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
147            PipelineData::Value(value, metadata) => {
148                PipelineData::value(value.with_span(span), metadata)
149            }
150            PipelineData::ListStream(stream, metadata) => {
151                PipelineData::list_stream(stream.with_span(span), metadata)
152            }
153            PipelineData::ByteStream(stream, metadata) => {
154                PipelineData::byte_stream(stream.with_span(span), metadata)
155            }
156        }
157    }
158
159    /// Get a type that is representative of the `PipelineData`.
160    ///
161    /// The type returned here makes no effort to collect a stream, so it may be a different type
162    /// than would be returned by [`Value::get_type()`] on the result of
163    /// [`.into_value()`](Self::into_value).
164    ///
165    /// Specifically, a `ListStream` results in `list<any>` rather than
166    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
167    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
168    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
169    pub fn get_type(&self) -> Type {
170        match self {
171            PipelineData::Empty => Type::Nothing,
172            PipelineData::Value(value, _) => value.get_type(),
173            PipelineData::ListStream(_, _) => Type::list(Type::Any),
174            PipelineData::ByteStream(stream, _) => stream.type_().into(),
175        }
176    }
177
178    /// Determine if the `PipelineData` is a [subtype](https://en.wikipedia.org/wiki/Subtyping) of `other`.
179    ///
180    /// This check makes no effort to collect a stream, so it may be a different result
181    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
182    /// [`.into_value()`](Self::into_value).
183    ///
184    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
185    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
186    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
187    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
188    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
189    ///
190    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
191    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
192    pub fn is_subtype_of(&self, other: &Type) -> bool {
193        match (self, other) {
194            (_, Type::Any) => true,
195            (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
196            (PipelineData::Empty, Type::Nothing) => true,
197            (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
198
199            // a list stream could be a list with any type, including a table
200            (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
201
202            (PipelineData::ByteStream(stream, ..), Type::String)
203                if stream.type_().is_string_coercible() =>
204            {
205                true
206            }
207            (PipelineData::ByteStream(stream, ..), Type::Binary)
208                if stream.type_().is_binary_coercible() =>
209            {
210                true
211            }
212
213            (PipelineData::Empty, _) => false,
214            (PipelineData::ListStream(..), _) => false,
215            (PipelineData::ByteStream(..), _) => false,
216        }
217    }
218
219    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
220        match self {
221            PipelineData::Empty => Ok(Value::nothing(span)),
222            PipelineData::Value(value, ..) => {
223                if value.span() == Span::unknown() {
224                    Ok(value.with_span(span))
225                } else {
226                    Ok(value)
227                }
228            }
229            PipelineData::ListStream(stream, ..) => stream.into_value(),
230            PipelineData::ByteStream(stream, ..) => stream.into_value(),
231        }
232    }
233
234    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
235    ///
236    /// This means that lists and ranges are converted into list streams, and strings and binary are
237    /// converted into byte streams.
238    ///
239    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
240    /// variant. If the variant is already a stream variant, it is returned as-is.
241    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
242        let span = self.span().unwrap_or(Span::unknown());
243        match self {
244            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
245            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
246                let metadata = metadata.clone();
247                Ok(PipelineData::list_stream(
248                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
249                    metadata,
250                ))
251            }
252            PipelineData::Value(Value::String { val, .. }, metadata) => {
253                Ok(PipelineData::byte_stream(
254                    ByteStream::read_string(val, span, engine_state.signals().clone()),
255                    metadata,
256                ))
257            }
258            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
259                Ok(PipelineData::byte_stream(
260                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
261                    metadata,
262                ))
263            }
264            PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
265                match val.to_base_value(internal_span) {
266                    Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
267                        ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
268                        metadata,
269                    )),
270                    Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
271                        ListStream::new(
272                            val.into_range_iter(span, Signals::empty()),
273                            span,
274                            engine_state.signals().clone(),
275                        ),
276                        metadata,
277                    )),
278                    Ok(other) => Err(PipelineData::value(other, metadata)),
279                    Err(_) => Err(PipelineData::Value(
280                        Value::Custom { val, internal_span },
281                        metadata,
282                    )),
283                }
284            }
285            _ => Err(self),
286        }
287    }
288
289    /// Converts this value into a stream when possible, otherwise returns the original value.
290    ///
291    /// This is a convenience wrapper around [`PipelineData::try_into_stream`] for command code
292    /// paths that can operate on both stream and non-stream input without branching.
293    #[must_use]
294    pub fn into_stream_or_original(self, engine_state: &EngineState) -> PipelineData {
295        self.try_into_stream(engine_state)
296            .unwrap_or_else(|original| original)
297    }
298
299    /// Drain and write this [`PipelineData`] to `dest`.
300    ///
301    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
302    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
303        match self {
304            PipelineData::Empty => Ok(()),
305            PipelineData::Value(value, ..) => {
306                let bytes = value_to_bytes(value)?;
307                dest.write_all(&bytes).map_err(|err| {
308                    IoError::new_internal(err, "Could not write PipelineData to dest")
309                })?;
310                dest.flush().map_err(|err| {
311                    IoError::new_internal(err, "Could not flush PipelineData to dest")
312                })?;
313                Ok(())
314            }
315            PipelineData::ListStream(stream, ..) => {
316                for value in stream {
317                    let bytes = value_to_bytes(value)?;
318                    dest.write_all(&bytes).map_err(|err| {
319                        IoError::new_internal(err, "Could not write PipelineData to dest")
320                    })?;
321                    dest.write_all(b"\n").map_err(|err| {
322                        IoError::new_internal(
323                            err,
324                            "Could not write linebreak after PipelineData to dest",
325                        )
326                    })?;
327                }
328                dest.flush().map_err(|err| {
329                    IoError::new_internal(err, "Could not flush PipelineData to dest")
330                })?;
331                Ok(())
332            }
333            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
334        }
335    }
336
337    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
338    ///
339    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
340    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
341    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
342    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
343    pub fn drain_to_out_dests(
344        mut self,
345        engine_state: &EngineState,
346        stack: &mut Stack,
347    ) -> Result<Self, ShellError> {
348        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
349            OutDest::Print => {
350                self.print_table(engine_state, stack, false, false)?;
351                Ok(Self::Empty)
352            }
353            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
354            OutDest::Value => {
355                let metadata = self.take_metadata();
356                let span = self.span().unwrap_or(Span::unknown());
357                self.into_value(span).map(|val| Self::Value(val, metadata))
358            }
359            OutDest::File(file) => {
360                self.write_to(file.as_ref())?;
361                Ok(Self::Empty)
362            }
363            OutDest::Null | OutDest::Inherit => {
364                self.drain()?;
365                Ok(Self::Empty)
366            }
367        }
368    }
369
370    pub fn drain(self) -> Result<(), ShellError> {
371        match self {
372            Self::Empty => Ok(()),
373            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
374            Self::Value(..) => Ok(()),
375            Self::ListStream(stream, ..) => stream.drain(),
376            Self::ByteStream(stream, ..) => stream.drain(),
377        }
378    }
379
380    /// Try convert from self into iterator
381    ///
382    /// It returns Err if the `self` cannot be converted to an iterator.
383    ///
384    /// The `span` should be the span of the command or operation that would raise an error.
385    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
386        Ok(PipelineIterator(match self {
387            PipelineData::Value(value, ..) => {
388                let val_span = value.span();
389                match value {
390                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
391                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
392                    ),
393                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
394                        ListStream::new(
395                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
396                            val_span,
397                            Signals::empty(),
398                        )
399                        .into_iter(),
400                    ),
401                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
402                        ListStream::new(
403                            val.into_range_iter(val_span, Signals::empty()),
404                            val_span,
405                            Signals::empty(),
406                        )
407                        .into_iter(),
408                    ),
409                    // Handle iterable custom values by converting to base value first
410                    #[expect(deprecated)]
411                    Value::Custom { ref val, .. } if val.is_iterable() => {
412                        match val.to_base_value(val_span) {
413                            Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
414                                ListStream::new(vals.into_iter(), val_span, Signals::empty())
415                                    .into_iter(),
416                            ),
417                            Ok(other) => {
418                                return Err(ShellError::OnlySupportsThisInputType {
419                                    exp_input_type: "list, binary, range, or byte stream".into(),
420                                    wrong_type: other.get_type().to_string(),
421                                    dst_span: span,
422                                    src_span: val_span,
423                                });
424                            }
425                            Err(err) => return Err(err),
426                        }
427                    }
428                    // Propagate errors by explicitly matching them before the final case.
429                    Value::Error { error, .. } => return Err(*error),
430                    other => {
431                        return Err(ShellError::OnlySupportsThisInputType {
432                            exp_input_type: "list, binary, range, or byte stream".into(),
433                            wrong_type: other.get_type().to_string(),
434                            dst_span: span,
435                            src_span: val_span,
436                        });
437                    }
438                }
439            }
440            PipelineData::ListStream(stream, ..) => {
441                PipelineIteratorInner::ListStream(stream.into_iter())
442            }
443            PipelineData::Empty => {
444                return Err(ShellError::OnlySupportsThisInputType {
445                    exp_input_type: "list, binary, range, or byte stream".into(),
446                    wrong_type: "null".into(),
447                    dst_span: span,
448                    src_span: span,
449                });
450            }
451            PipelineData::ByteStream(stream, ..) => {
452                if let Some(chunks) = stream.chunks() {
453                    PipelineIteratorInner::ByteStream(chunks)
454                } else {
455                    PipelineIteratorInner::Empty
456                }
457            }
458        }))
459    }
460
461    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
462        match self {
463            PipelineData::Empty => Ok(String::new()),
464            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
465            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
466            PipelineData::ByteStream(stream, ..) => stream.into_string(),
467        }
468    }
469
470    /// Retrieves string from pipeline data.
471    ///
472    /// As opposed to `collect_string` this raises error rather than converting non-string values.
473    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
474    pub fn collect_string_strict(
475        self,
476        span: Span,
477    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
478        match self {
479            PipelineData::Empty => Ok((String::new(), span, None)),
480            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
481            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
482                err_message: "string".into(),
483                span: val.span(),
484            }),
485            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
486                err_message: "string".into(),
487                span,
488            }),
489            PipelineData::ByteStream(stream, metadata) => {
490                let span = stream.span();
491                Ok((stream.into_string()?, span, metadata))
492            }
493        }
494    }
495
496    pub fn follow_cell_path(
497        self,
498        cell_path: &[PathMember],
499        head: Span,
500    ) -> Result<Value, ShellError> {
501        match self {
502            // FIXME: there are probably better ways of doing this
503            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
504                .follow_cell_path(cell_path)
505                .map(Cow::into_owned),
506            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
507            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
508                type_name: "empty pipeline".to_string(),
509                span: head,
510            }),
511            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
512                type_name: stream.type_().describe().to_owned(),
513                span: stream.span(),
514            }),
515        }
516    }
517
518    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
519    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
520    where
521        Self: Sized,
522        F: FnMut(Value) -> Value + 'static + Send,
523    {
524        match self {
525            PipelineData::Value(value, metadata) => {
526                let span = value.span();
527                let pipeline = match value {
528                    Value::List { vals, .. } => vals
529                        .into_iter()
530                        .map(f)
531                        .into_pipeline_data(span, signals.clone()),
532                    Value::Range { val, .. } => val
533                        .into_range_iter(span, Signals::empty())
534                        .map(f)
535                        .into_pipeline_data(span, signals.clone()),
536                    #[expect(deprecated)]
537                    Value::Custom { ref val, .. } if val.is_iterable() => {
538                        match val.to_base_value(span)? {
539                            Value::List { vals, .. } => vals
540                                .into_iter()
541                                .map(f)
542                                .into_pipeline_data(span, signals.clone()),
543                            Value::Range { val, .. } => val
544                                .into_range_iter(span, Signals::empty())
545                                .map(f)
546                                .into_pipeline_data(span, signals.clone()),
547                            value => match f(value) {
548                                Value::Error { error, .. } => return Err(*error),
549                                v => v.into_pipeline_data(),
550                            },
551                        }
552                    }
553                    value => match f(value) {
554                        Value::Error { error, .. } => return Err(*error),
555                        v => v.into_pipeline_data(),
556                    },
557                };
558                Ok(pipeline.set_metadata(metadata))
559            }
560            PipelineData::Empty => Ok(PipelineData::empty()),
561            PipelineData::ListStream(stream, metadata) => {
562                Ok(PipelineData::list_stream(stream.map(f), metadata))
563            }
564            PipelineData::ByteStream(stream, metadata) => {
565                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
566            }
567        }
568    }
569
570    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
571    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
572    where
573        Self: Sized,
574        U: IntoIterator<Item = Value> + 'static,
575        <U as IntoIterator>::IntoIter: 'static + Send,
576        F: FnMut(Value) -> U + 'static + Send,
577    {
578        match self {
579            PipelineData::Empty => Ok(PipelineData::empty()),
580            PipelineData::Value(value, metadata) => {
581                let span = value.span();
582                let pipeline = match value {
583                    Value::List { vals, .. } => vals
584                        .into_iter()
585                        .flat_map(f)
586                        .into_pipeline_data(span, signals.clone()),
587                    Value::Range { val, .. } => val
588                        .into_range_iter(span, Signals::empty())
589                        .flat_map(f)
590                        .into_pipeline_data(span, signals.clone()),
591                    #[expect(deprecated)]
592                    Value::Custom { ref val, .. } if val.is_iterable() => {
593                        match val.to_base_value(span)? {
594                            Value::List { vals, .. } => vals
595                                .into_iter()
596                                .flat_map(f)
597                                .into_pipeline_data(span, signals.clone()),
598                            Value::Range { val, .. } => val
599                                .into_range_iter(span, Signals::empty())
600                                .flat_map(f)
601                                .into_pipeline_data(span, signals.clone()),
602                            value => f(value)
603                                .into_iter()
604                                .into_pipeline_data(span, signals.clone()),
605                        }
606                    }
607                    value => f(value)
608                        .into_iter()
609                        .into_pipeline_data(span, signals.clone()),
610                };
611                Ok(pipeline.set_metadata(metadata))
612            }
613            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
614                stream.modify(|iter| iter.flat_map(f)),
615                metadata,
616            )),
617            PipelineData::ByteStream(stream, metadata) => {
618                // TODO: is this behavior desired / correct ?
619                let span = stream.span();
620                let iter = match String::from_utf8(stream.into_bytes()?) {
621                    Ok(mut str) => {
622                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
623                        f(Value::string(str, span))
624                    }
625                    Err(err) => f(Value::binary(err.into_bytes(), span)),
626                };
627                Ok(iter.into_iter().into_pipeline_data_with_metadata(
628                    span,
629                    signals.clone(),
630                    metadata,
631                ))
632            }
633        }
634    }
635
636    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
637    where
638        Self: Sized,
639        F: FnMut(&Value) -> bool + 'static + Send,
640    {
641        match self {
642            PipelineData::Empty => Ok(PipelineData::empty()),
643            PipelineData::Value(value, metadata) => {
644                let span = value.span();
645                let pipeline = match value {
646                    Value::List { vals, .. } => vals
647                        .into_iter()
648                        .filter(f)
649                        .into_pipeline_data(span, signals.clone()),
650                    Value::Range { val, .. } => val
651                        .into_range_iter(span, Signals::empty())
652                        .filter(f)
653                        .into_pipeline_data(span, signals.clone()),
654                    #[expect(deprecated)]
655                    Value::Custom { ref val, .. } if val.is_iterable() => {
656                        match val.to_base_value(span)? {
657                            Value::List { vals, .. } => vals
658                                .into_iter()
659                                .filter(f)
660                                .into_pipeline_data(span, signals.clone()),
661                            Value::Range { val, .. } => val
662                                .into_range_iter(span, Signals::empty())
663                                .filter(f)
664                                .into_pipeline_data(span, signals.clone()),
665                            value => {
666                                if f(&value) {
667                                    value.into_pipeline_data()
668                                } else {
669                                    Value::nothing(span).into_pipeline_data()
670                                }
671                            }
672                        }
673                    }
674                    value => {
675                        if f(&value) {
676                            value.into_pipeline_data()
677                        } else {
678                            Value::nothing(span).into_pipeline_data()
679                        }
680                    }
681                };
682                Ok(pipeline.set_metadata(metadata))
683            }
684            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
685                stream.modify(|iter| iter.filter(f)),
686                metadata,
687            )),
688            PipelineData::ByteStream(stream, metadata) => {
689                // TODO: is this behavior desired / correct ?
690                let span = stream.span();
691                let value = match String::from_utf8(stream.into_bytes()?) {
692                    Ok(mut str) => {
693                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
694                        Value::string(str, span)
695                    }
696                    Err(err) => Value::binary(err.into_bytes(), span),
697                };
698                let value = if f(&value) {
699                    value
700                } else {
701                    Value::nothing(span)
702                };
703                Ok(value.into_pipeline_data_with_metadata(metadata))
704            }
705        }
706    }
707
708    /// Try to convert Value from Value::Range to Value::List.
709    /// This is useful to expand Value::Range into array notation, specifically when
710    /// converting `to json` or `to nuon`.
711    /// `1..3 | to XX -> [1,2,3]`
712    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
713        match self {
714            PipelineData::Value(v, metadata) => {
715                let span = v.span();
716                match v {
717                    Value::Range { val, .. } => {
718                        match *val {
719                            Range::IntRange(range) => {
720                                if range.is_unbounded() {
721                                    return Err(ShellError::Generic(
722                                        GenericError::new(
723                                            "Cannot create range",
724                                            "Unbounded ranges are not allowed when converting to this format",
725                                            span,
726                                        )
727                                        .with_help(
728                                            "Consider using ranges with valid start and end point.",
729                                        ),
730                                    ));
731                                }
732                            }
733                            Range::FloatRange(range) => {
734                                if range.is_unbounded() {
735                                    return Err(ShellError::Generic(
736                                        GenericError::new(
737                                            "Cannot create range",
738                                            "Unbounded ranges are not allowed when converting to this format",
739                                            span,
740                                        )
741                                        .with_help(
742                                            "Consider using ranges with valid start and end point.",
743                                        ),
744                                    ));
745                                }
746                            }
747                        }
748                        let range_values: Vec<Value> =
749                            val.into_range_iter(span, Signals::empty()).collect();
750                        Ok(PipelineData::value(Value::list(range_values, span), None))
751                    }
752                    x => Ok(PipelineData::value(x, metadata)),
753                }
754            }
755            _ => Ok(self),
756        }
757    }
758
759    /// Consume and print self data immediately, formatted using table command.
760    ///
761    /// This does not respect the display_output hook. If a value is being printed out by a command,
762    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
763    ///
764    /// `no_newline` controls if we need to attach newline character to output.
765    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
766    pub fn print_table(
767        self,
768        engine_state: &EngineState,
769        stack: &mut Stack,
770        no_newline: bool,
771        to_stderr: bool,
772    ) -> Result<(), ShellError> {
773        match self {
774            // Print byte streams directly as long as they aren't binary.
775            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
776                stream.print(to_stderr)
777            }
778            _ => {
779                // If the table function is in the declarations, then we can use it
780                // to create the table value that will be printed in the terminal
781                if let Some(decl_id) = engine_state.table_decl_id {
782                    let command = engine_state.get_decl(decl_id);
783                    if command.block_id().is_some() {
784                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
785                    } else {
786                        let call = Call::new(Span::new(0, 0));
787                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
788                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
789                    }
790                } else {
791                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
792                }
793            }
794        }
795    }
796
797    /// Consume and print self data without any extra formatting.
798    ///
799    /// This does not use the `table` command to format data, and also prints binary values and
800    /// streams in their raw format without generating a hexdump first.
801    ///
802    /// `no_newline` controls if we need to attach newline character to output.
803    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
804    pub fn print_raw(
805        self,
806        engine_state: &EngineState,
807        no_newline: bool,
808        to_stderr: bool,
809    ) -> Result<(), ShellError> {
810        let span = self.span();
811        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
812            if to_stderr {
813                write_all_and_flush(
814                    bytes,
815                    &mut std::io::stderr().lock(),
816                    "stderr",
817                    span,
818                    engine_state.signals(),
819                )?;
820            } else {
821                write_all_and_flush(
822                    bytes,
823                    &mut std::io::stdout().lock(),
824                    "stdout",
825                    span,
826                    engine_state.signals(),
827                )?;
828            }
829            Ok(())
830        } else {
831            self.write_all_and_flush(engine_state, no_newline, to_stderr)
832        }
833    }
834
835    fn write_all_and_flush(
836        self,
837        engine_state: &EngineState,
838        no_newline: bool,
839        to_stderr: bool,
840    ) -> Result<(), ShellError> {
841        let span = self.span();
842        if let PipelineData::ByteStream(stream, ..) = self {
843            // Copy ByteStreams directly
844            stream.print(to_stderr)
845        } else {
846            let config = engine_state.get_config();
847            for item in self {
848                let mut out = if let Value::Error { error, .. } = item {
849                    return Err(*error);
850                } else {
851                    item.to_expanded_string("\n", config)
852                };
853
854                if !no_newline {
855                    out.push('\n');
856                }
857
858                if to_stderr {
859                    write_all_and_flush(
860                        out,
861                        &mut std::io::stderr().lock(),
862                        "stderr",
863                        span,
864                        engine_state.signals(),
865                    )?;
866                } else {
867                    write_all_and_flush(
868                        out,
869                        &mut std::io::stdout().lock(),
870                        "stdout",
871                        span,
872                        engine_state.signals(),
873                    )?;
874                }
875            }
876
877            Ok(())
878        }
879    }
880
881    pub fn unsupported_input_error(
882        self,
883        expected_type: impl Into<String>,
884        span: Span,
885    ) -> ShellError {
886        match self {
887            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
888            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
889                exp_input_type: expected_type.into(),
890                wrong_type: value.get_type().get_non_specified_string(),
891                dst_span: span,
892                src_span: value.span(),
893            },
894            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
895                exp_input_type: expected_type.into(),
896                wrong_type: "list (stream)".into(),
897                dst_span: span,
898                src_span: stream.span(),
899            },
900            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
901                exp_input_type: expected_type.into(),
902                wrong_type: stream.type_().describe().into(),
903                dst_span: span,
904                src_span: stream.span(),
905            },
906        }
907    }
908
909    // PipelineData might connect to a running process which has an exit status future
910    // Use this method to retrieve that future, it's useful for implementing `pipefail` feature.
911    #[cfg(feature = "os")]
912    pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
913        match self {
914            PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
915            PipelineData::ByteStream(stream, ..) => match stream.source() {
916                ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
917                ByteStreamSource::Child(c) => {
918                    let exit_future = c.clone_exit_status_future();
919                    let ignore_error = c.clone_ignore_error();
920                    Some(ExitStatusGuard::new(exit_future, ignore_error))
921                }
922            },
923        }
924    }
925}
926
927pub fn write_all_and_flush<T>(
928    data: T,
929    destination: &mut impl Write,
930    destination_name: &str,
931    span: Option<Span>,
932    signals: &Signals,
933) -> Result<(), ShellError>
934where
935    T: AsRef<[u8]>,
936{
937    let io_error_map = |err: std::io::Error, location: &Location<'_>| {
938        let context = format!("Writing to {destination_name} failed");
939        match span {
940            None => IoError::new_internal_with_location(err, context, location),
941            Some(span) if span == Span::unknown() => {
942                IoError::new_internal_with_location(err, context, location)
943            }
944            Some(span) => IoError::new_with_additional_context(err, span, None, context),
945        }
946    };
947
948    let span = span.unwrap_or(Span::unknown());
949    const OUTPUT_CHUNK_SIZE: usize = 8192;
950    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
951        signals.check(&span)?;
952        destination
953            .write_all(chunk)
954            .map_err(|err| io_error_map(err, Location::caller()))?;
955    }
956    destination
957        .flush()
958        .map_err(|err| io_error_map(err, Location::caller()))?;
959    Ok(())
960}
961
962enum PipelineIteratorInner {
963    Empty,
964    Value(Value),
965    ListStream(crate::list_stream::IntoIter),
966    ByteStream(crate::byte_stream::Chunks),
967}
968
969pub struct PipelineIterator(PipelineIteratorInner);
970
971impl IntoIterator for PipelineData {
972    type Item = Value;
973
974    type IntoIter = PipelineIterator;
975
976    fn into_iter(self) -> Self::IntoIter {
977        PipelineIterator(match self {
978            PipelineData::Empty => PipelineIteratorInner::Empty,
979            PipelineData::Value(value, ..) => {
980                let span = value.span();
981                match value {
982                    Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
983                        ListStream::new(
984                            vals.into_iter(),
985                            span,
986                            signals.unwrap_or_else(Signals::empty),
987                        )
988                        .into_iter(),
989                    ),
990                    Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
991                        ListStream::new(
992                            val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
993                            span,
994                            Signals::empty(),
995                        )
996                        .into_iter(),
997                    ),
998                    // Handle iterable custom values by converting to base value first
999                    #[expect(deprecated)]
1000                    Value::Custom { ref val, .. } if val.is_iterable() => {
1001                        match val.to_base_value(span) {
1002                            Ok(Value::List { vals, signals, .. }) => {
1003                                PipelineIteratorInner::ListStream(
1004                                    ListStream::new(
1005                                        vals.into_iter(),
1006                                        span,
1007                                        signals.unwrap_or_else(Signals::empty),
1008                                    )
1009                                    .into_iter(),
1010                                )
1011                            }
1012                            Ok(other) => PipelineIteratorInner::Value(other),
1013                            Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
1014                        }
1015                    }
1016                    x => PipelineIteratorInner::Value(x),
1017                }
1018            }
1019            PipelineData::ListStream(stream, ..) => {
1020                PipelineIteratorInner::ListStream(stream.into_iter())
1021            }
1022            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
1023                PipelineIteratorInner::Empty,
1024                PipelineIteratorInner::ByteStream,
1025            ),
1026        })
1027    }
1028}
1029
1030impl Iterator for PipelineIterator {
1031    type Item = Value;
1032
1033    fn next(&mut self) -> Option<Self::Item> {
1034        match &mut self.0 {
1035            PipelineIteratorInner::Empty => None,
1036            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
1037            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
1038            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
1039            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
1040                Ok(x) => x,
1041                Err(err) => Value::error(
1042                    err,
1043                    Span::unknown(), //FIXME: unclear where this span should come from
1044                ),
1045            }),
1046        }
1047    }
1048}
1049
1050pub trait IntoPipelineData {
1051    fn into_pipeline_data(self) -> PipelineData;
1052
1053    fn into_pipeline_data_with_metadata(
1054        self,
1055        metadata: impl Into<Option<PipelineMetadata>>,
1056    ) -> PipelineData;
1057}
1058
1059impl<V> IntoPipelineData for V
1060where
1061    V: Into<Value>,
1062{
1063    fn into_pipeline_data(self) -> PipelineData {
1064        PipelineData::value(self.into(), None)
1065    }
1066
1067    fn into_pipeline_data_with_metadata(
1068        self,
1069        metadata: impl Into<Option<PipelineMetadata>>,
1070    ) -> PipelineData {
1071        PipelineData::value(self.into(), metadata.into())
1072    }
1073}
1074
1075pub trait IntoInterruptiblePipelineData {
1076    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1077    fn into_pipeline_data_with_metadata(
1078        self,
1079        span: Span,
1080        signals: Signals,
1081        metadata: impl Into<Option<PipelineMetadata>>,
1082    ) -> PipelineData;
1083}
1084
1085impl<I> IntoInterruptiblePipelineData for I
1086where
1087    I: IntoIterator + Send + 'static,
1088    I::IntoIter: Send + 'static,
1089    <I::IntoIter as Iterator>::Item: Into<Value>,
1090{
1091    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1092        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1093    }
1094
1095    fn into_pipeline_data_with_metadata(
1096        self,
1097        span: Span,
1098        signals: Signals,
1099        metadata: impl Into<Option<PipelineMetadata>>,
1100    ) -> PipelineData {
1101        PipelineData::list_stream(
1102            ListStream::new(self.into_iter().map(Into::into), span, signals),
1103            metadata.into(),
1104        )
1105    }
1106}
1107
1108fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1109    let bytes = match value {
1110        Value::String { val, .. } => val.into_bytes(),
1111        Value::Binary { val, .. } => val,
1112        Value::List { vals, .. } => {
1113            let val = vals
1114                .into_iter()
1115                .map(Value::coerce_into_string)
1116                .collect::<Result<Vec<String>, ShellError>>()?
1117                .join("\n")
1118                + "\n";
1119
1120            val.into_bytes()
1121        }
1122        // Propagate errors by explicitly matching them before the final case.
1123        Value::Error { error, .. } => return Err(*error),
1124        value => value.coerce_into_string()?.into_bytes(),
1125    };
1126    Ok(bytes)
1127}
1128
1129/// A wrapper to [`PipelineData`] which can also track exit status.
1130///
1131/// We use exit status tracking to implement the `pipefail` feature.
1132#[derive(Debug)]
1133pub struct PipelineExecutionData {
1134    pub body: PipelineData,
1135    #[cfg(feature = "os")]
1136    pub exit: Vec<Option<ExitStatusGuard>>,
1137}
1138
1139impl Deref for PipelineExecutionData {
1140    type Target = PipelineData;
1141
1142    fn deref(&self) -> &Self::Target {
1143        &self.body
1144    }
1145}
1146
1147impl DerefMut for PipelineExecutionData {
1148    fn deref_mut(&mut self) -> &mut Self::Target {
1149        &mut self.body
1150    }
1151}
1152
1153impl PipelineExecutionData {
1154    pub fn empty() -> Self {
1155        Self {
1156            body: PipelineData::empty(),
1157            #[cfg(feature = "os")]
1158            exit: vec![],
1159        }
1160    }
1161}
1162
1163impl From<PipelineData> for PipelineExecutionData {
1164    #[cfg(feature = "os")]
1165    fn from(value: PipelineData) -> Self {
1166        let value_span = value.span().unwrap_or_else(Span::unknown);
1167        let exit_status_future = value
1168            .clone_exit_status_future()
1169            .map(|f| f.with_span(value_span));
1170        Self {
1171            body: value,
1172            exit: vec![exit_status_future],
1173        }
1174    }
1175
1176    #[cfg(not(feature = "os"))]
1177    fn from(value: PipelineData) -> Self {
1178        Self { body: value }
1179    }
1180}