Skip to main content

nu_protocol/pipeline/
pipeline_data.rs

1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4    ByteStream, ByteStreamSource, ByteStreamType, CompareTypes, Config, ListStream, OutDest,
5    PipelineMetadata, Range, ShellError, Signals, Span, Type, TypeRelation, Value,
6    ast::{Call, PathMember},
7    engine::{EngineState, Stack},
8    shell_error::{generic::GenericError, io::IoError},
9};
10use std::{
11    borrow::Cow,
12    io::Write,
13    ops::{Deref, DerefMut},
14    panic::Location,
15};
16
17const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
18
19/// The foundational abstraction for input and output to commands
20///
21/// This represents either a single Value or a stream of values coming into the command or leaving a command.
22///
23/// A note on implementation:
24///
25/// We've tried a few variations of this structure. Listing these below so we have a record.
26///
27/// * We tried always assuming a stream in Nushell. This was a great 80% solution, but it had some rough edges.
28///   Namely, how do you know the difference between a single string and a list of one string. How do you know
29///   when to flatten the data given to you from a data source into the stream or to keep it as an unflattened
30///   list?
31///
32/// * We tried putting the stream into Value. This had some interesting properties as now commands "just worked
33///   on values", but lead to a few unfortunate issues.
34///
35/// The first is that you can't easily clone Values in a way that felt largely immutable. For example, if
36/// you cloned a Value which contained a stream, and in one variable drained some part of it, then the second
37/// variable would see different values based on what you did to the first.
38///
39/// To make this kind of mutation thread-safe, we would have had to produce a lock for the stream, which in
40/// practice would have meant always locking the stream before reading from it. But more fundamentally, it
41/// felt wrong in practice that observation of a value at runtime could affect other values which happen to
42/// alias the same stream. By separating these, we don't have this effect. Instead, variables could get
43/// concrete list values rather than streams, and be able to view them without non-local effects.
44///
45/// * A balance of the two approaches is what we've landed on: Values are thread-safe to pass, and we can stream
46///   them into any sources. Streams are still available to model the infinite streams approach of original
47///   Nushell.
48#[derive(Debug)]
49pub enum PipelineData {
50    Empty,
51    Value(Value, Option<PipelineMetadata>),
52    ListStream(ListStream, Option<PipelineMetadata>),
53    ByteStream(ByteStream, Option<PipelineMetadata>),
54}
55
56impl PipelineData {
57    pub const fn empty() -> PipelineData {
58        PipelineData::Empty
59    }
60
61    pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62        PipelineData::Value(val, metadata.into())
63    }
64
65    pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66        PipelineData::ListStream(stream, metadata.into())
67    }
68
69    pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
70        PipelineData::ByteStream(stream, metadata.into())
71    }
72
73    /// Returns a clone of the metadata if it exists.
74    ///
75    /// Note: This performs a deep clone of heap-allocated structures.
76    /// Use [`.metadata_ref()`](Self::metadata_ref), [`.metadata_mut()`](Self::metadata_mut)
77    /// or [`.take_metadata()`](Self::take_metadata) to avoid unnecessary allocations.
78    #[deprecated(
79        since = "0.111.1",
80        note = "Use .metadata_ref(), .metadata_mut() or .take_metadata() instead"
81    )]
82    pub fn metadata(&self) -> Option<PipelineMetadata> {
83        self.metadata_ref().cloned()
84    }
85
86    /// Returns a reference to the metadata if it exists.
87    pub fn metadata_ref(&self) -> Option<&PipelineMetadata> {
88        match self {
89            PipelineData::Empty => None,
90            PipelineData::Value(_, meta)
91            | PipelineData::ListStream(_, meta)
92            | PipelineData::ByteStream(_, meta) => meta.as_ref(),
93        }
94    }
95
96    /// Returns a mutable reference to the metadata if it exists.
97    pub fn metadata_mut(&mut self) -> Option<&mut PipelineMetadata> {
98        match self {
99            PipelineData::Empty => None,
100            PipelineData::Value(_, meta)
101            | PipelineData::ListStream(_, meta)
102            | PipelineData::ByteStream(_, meta) => meta.as_mut(),
103        }
104    }
105
106    /// Take the metadata out of pipeline if it exists.
107    pub fn take_metadata(&mut self) -> Option<PipelineMetadata> {
108        match self {
109            PipelineData::Empty => None,
110            PipelineData::Value(_, meta)
111            | PipelineData::ListStream(_, meta)
112            | PipelineData::ByteStream(_, meta) => meta.take(),
113        }
114    }
115
116    pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
117        match &mut self {
118            PipelineData::Empty => {}
119            PipelineData::Value(_, meta)
120            | PipelineData::ListStream(_, meta)
121            | PipelineData::ByteStream(_, meta) => *meta = metadata,
122        }
123        self
124    }
125
126    pub fn is_nothing(&self) -> bool {
127        matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
128            || matches!(self, PipelineData::Empty)
129    }
130
131    /// PipelineData doesn't always have a Span, but we can try!
132    pub fn span(&self) -> Option<Span> {
133        match self {
134            PipelineData::Empty => None,
135            PipelineData::Value(value, ..) => Some(value.span()),
136            PipelineData::ListStream(stream, ..) => Some(stream.span()),
137            PipelineData::ByteStream(stream, ..) => Some(stream.span()),
138        }
139    }
140
141    /// Change the span of the [`PipelineData`].
142    ///
143    /// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
144    pub fn with_span(self, span: Span) -> Self {
145        match self {
146            PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
147            PipelineData::Value(value, metadata) => {
148                PipelineData::value(value.with_span(span), metadata)
149            }
150            PipelineData::ListStream(stream, metadata) => {
151                PipelineData::list_stream(stream.with_span(span), metadata)
152            }
153            PipelineData::ByteStream(stream, metadata) => {
154                PipelineData::byte_stream(stream.with_span(span), metadata)
155            }
156        }
157    }
158
159    /// Get a type that is representative of the `PipelineData`.
160    ///
161    /// The type returned here makes no effort to collect a stream, so it may be a different type
162    /// than would be returned by [`Value::get_type()`] on the result of
163    /// [`.into_value()`](Self::into_value).
164    ///
165    /// Specifically, a `ListStream` results in `list<any>` rather than
166    /// the fully complete [`list`](Type::List) type (which would require knowing the contents),
167    /// and a `ByteStream` with [unknown](crate::ByteStreamType::Unknown) type results in
168    /// [`any`](Type::Any) rather than [`string`](Type::String) or [`binary`](Type::Binary).
169    pub fn get_type(&self) -> Type {
170        match self {
171            PipelineData::Empty => Type::Nothing,
172            PipelineData::Value(value, _) => value.get_type(),
173            PipelineData::ListStream(_, _) => Type::list(Type::Any),
174            PipelineData::ByteStream(stream, _) => stream.type_().into(),
175        }
176    }
177
178    pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
179        match self {
180            PipelineData::Empty => Ok(Value::nothing(span)),
181            PipelineData::Value(value, ..) => {
182                if value.span() == Span::unknown() {
183                    Ok(value.with_span(span))
184                } else {
185                    Ok(value)
186                }
187            }
188            PipelineData::ListStream(stream, ..) => stream.into_value(),
189            PipelineData::ByteStream(stream, ..) => stream.into_value(),
190        }
191    }
192
193    /// Converts any `Value` variant that can be represented as a stream into its stream variant.
194    ///
195    /// This means that lists and ranges are converted into list streams, and strings and binary are
196    /// converted into byte streams.
197    ///
198    /// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
199    /// variant. If the variant is already a stream variant, it is returned as-is.
200    pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
201        let span = self.span().unwrap_or(Span::unknown());
202        match self {
203            PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
204            PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
205                let metadata = metadata.clone();
206                Ok(PipelineData::list_stream(
207                    ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
208                    metadata,
209                ))
210            }
211            PipelineData::Value(Value::String { val, .. }, metadata) => {
212                Ok(PipelineData::byte_stream(
213                    ByteStream::read_string(val, span, engine_state.signals().clone()),
214                    metadata,
215                ))
216            }
217            PipelineData::Value(Value::Binary { val, .. }, metadata) => {
218                Ok(PipelineData::byte_stream(
219                    ByteStream::read_binary(val, span, engine_state.signals().clone()),
220                    metadata,
221                ))
222            }
223            PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
224                match val.to_base_value(internal_span) {
225                    Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
226                        ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
227                        metadata,
228                    )),
229                    Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
230                        ListStream::new(
231                            val.into_range_iter(span, Signals::empty()),
232                            span,
233                            engine_state.signals().clone(),
234                        ),
235                        metadata,
236                    )),
237                    Ok(other) => Err(PipelineData::value(other, metadata)),
238                    Err(_) => Err(PipelineData::Value(
239                        Value::Custom { val, internal_span },
240                        metadata,
241                    )),
242                }
243            }
244            _ => Err(self),
245        }
246    }
247
248    /// Converts this value into a stream when possible, otherwise returns the original value.
249    ///
250    /// This is a convenience wrapper around [`PipelineData::try_into_stream`] for command code
251    /// paths that can operate on both stream and non-stream input without branching.
252    #[must_use]
253    pub fn into_stream_or_original(self, engine_state: &EngineState) -> PipelineData {
254        self.try_into_stream(engine_state)
255            .unwrap_or_else(|original| original)
256    }
257
258    /// Drain and write this [`PipelineData`] to `dest`.
259    ///
260    /// Values are converted to bytes and separated by newlines if this is a `ListStream`.
261    pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
262        match self {
263            PipelineData::Empty => Ok(()),
264            PipelineData::Value(value, ..) => {
265                let bytes = value_to_bytes(value)?;
266                dest.write_all(&bytes).map_err(|err| {
267                    IoError::new_internal(err, "Could not write PipelineData to dest")
268                })?;
269                dest.flush().map_err(|err| {
270                    IoError::new_internal(err, "Could not flush PipelineData to dest")
271                })?;
272                Ok(())
273            }
274            PipelineData::ListStream(stream, ..) => {
275                for value in stream {
276                    let bytes = value_to_bytes(value)?;
277                    dest.write_all(&bytes).map_err(|err| {
278                        IoError::new_internal(err, "Could not write PipelineData to dest")
279                    })?;
280                    dest.write_all(b"\n").map_err(|err| {
281                        IoError::new_internal(
282                            err,
283                            "Could not write linebreak after PipelineData to dest",
284                        )
285                    })?;
286                }
287                dest.flush().map_err(|err| {
288                    IoError::new_internal(err, "Could not flush PipelineData to dest")
289                })?;
290                Ok(())
291            }
292            PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
293        }
294    }
295
296    /// Drain this [`PipelineData`] according to the current stdout [`OutDest`]s in `stack`.
297    ///
298    /// For [`OutDest::Pipe`] and [`OutDest::PipeSeparate`], this will return the [`PipelineData`]
299    /// as is. For [`OutDest::Value`], this will collect into a value and return it. For
300    /// [`OutDest::Print`], the [`PipelineData`] is drained and printed. Otherwise, the
301    /// [`PipelineData`] is drained, but only printed if it is the output of an external command.
302    pub fn drain_to_out_dests(
303        mut self,
304        engine_state: &EngineState,
305        stack: &mut Stack,
306    ) -> Result<Self, ShellError> {
307        match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
308            OutDest::Print => {
309                self.print_table(engine_state, stack, false, false)?;
310                Ok(Self::Empty)
311            }
312            OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
313            OutDest::Value => {
314                let metadata = self.take_metadata();
315                let span = self.span().unwrap_or(Span::unknown());
316                self.into_value(span).map(|val| Self::Value(val, metadata))
317            }
318            OutDest::File(file) => {
319                self.write_to(file.as_ref())?;
320                Ok(Self::Empty)
321            }
322            OutDest::Null | OutDest::Inherit => {
323                self.drain()?;
324                Ok(Self::Empty)
325            }
326        }
327    }
328
329    pub fn drain(self) -> Result<(), ShellError> {
330        match self {
331            Self::Empty => Ok(()),
332            Self::Value(Value::Error { error, .. }, ..) => Err(*error),
333            Self::Value(..) => Ok(()),
334            Self::ListStream(stream, ..) => stream.drain(),
335            Self::ByteStream(stream, ..) => stream.drain(),
336        }
337    }
338
339    /// Try convert from self into iterator
340    ///
341    /// It returns Err if the `self` cannot be converted to an iterator.
342    ///
343    /// The `span` should be the span of the command or operation that would raise an error.
344    pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
345        Ok(PipelineIterator(match self {
346            PipelineData::Value(value, ..) => {
347                let val_span = value.span();
348                match value {
349                    Value::List { vals, .. } => PipelineIteratorInner::ListStream(
350                        ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
351                    ),
352                    Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
353                        ListStream::new(
354                            val.into_iter().map(move |x| Value::int(x as i64, val_span)),
355                            val_span,
356                            Signals::empty(),
357                        )
358                        .into_iter(),
359                    ),
360                    Value::Range { val, .. } => PipelineIteratorInner::ListStream(
361                        ListStream::new(
362                            val.into_range_iter(val_span, Signals::empty()),
363                            val_span,
364                            Signals::empty(),
365                        )
366                        .into_iter(),
367                    ),
368                    // Handle iterable custom values by converting to base value first
369                    #[expect(deprecated)]
370                    Value::Custom { ref val, .. } if val.is_iterable() => {
371                        match val.to_base_value(val_span) {
372                            Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
373                                ListStream::new(vals.into_iter(), val_span, Signals::empty())
374                                    .into_iter(),
375                            ),
376                            Ok(other) => {
377                                return Err(ShellError::OnlySupportsThisInputType {
378                                    exp_input_type: "list, binary, range, or byte stream".into(),
379                                    wrong_type: other.get_type().to_string(),
380                                    dst_span: span,
381                                    src_span: val_span,
382                                });
383                            }
384                            Err(err) => return Err(err),
385                        }
386                    }
387                    // Propagate errors by explicitly matching them before the final case.
388                    Value::Error { error, .. } => return Err(*error),
389                    other => {
390                        return Err(ShellError::OnlySupportsThisInputType {
391                            exp_input_type: "list, binary, range, or byte stream".into(),
392                            wrong_type: other.get_type().to_string(),
393                            dst_span: span,
394                            src_span: val_span,
395                        });
396                    }
397                }
398            }
399            PipelineData::ListStream(stream, ..) => {
400                PipelineIteratorInner::ListStream(stream.into_iter())
401            }
402            PipelineData::Empty => {
403                return Err(ShellError::OnlySupportsThisInputType {
404                    exp_input_type: "list, binary, range, or byte stream".into(),
405                    wrong_type: "null".into(),
406                    dst_span: span,
407                    src_span: span,
408                });
409            }
410            PipelineData::ByteStream(stream, ..) => {
411                if let Some(chunks) = stream.chunks() {
412                    PipelineIteratorInner::ByteStream(chunks)
413                } else {
414                    PipelineIteratorInner::Empty
415                }
416            }
417        }))
418    }
419
420    pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
421        match self {
422            PipelineData::Empty => Ok(String::new()),
423            PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
424            PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
425            PipelineData::ByteStream(stream, ..) => stream.into_string(),
426        }
427    }
428
429    /// Retrieves string from pipeline data.
430    ///
431    /// As opposed to `collect_string` this raises error rather than converting non-string values.
432    /// The `span` will be used if `ListStream` is encountered since it doesn't carry a span.
433    pub fn collect_string_strict(
434        self,
435        span: Span,
436    ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
437        match self {
438            PipelineData::Empty => Ok((String::new(), span, None)),
439            PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
440            PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
441                err_message: "string".into(),
442                span: val.span(),
443            }),
444            PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
445                err_message: "string".into(),
446                span,
447            }),
448            PipelineData::ByteStream(stream, metadata) => {
449                let span = stream.span();
450                Ok((stream.into_string()?, span, metadata))
451            }
452        }
453    }
454
455    pub fn follow_cell_path(
456        self,
457        cell_path: &[PathMember],
458        head: Span,
459    ) -> Result<Value, ShellError> {
460        match self {
461            // FIXME: there are probably better ways of doing this
462            PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
463                .follow_cell_path(cell_path)
464                .map(Cow::into_owned),
465            PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
466            PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
467                type_name: "empty pipeline".to_string(),
468                span: head,
469            }),
470            PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
471                type_name: stream.type_().describe().to_owned(),
472                span: stream.span(),
473            }),
474        }
475    }
476
477    /// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
478    pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
479    where
480        Self: Sized,
481        F: FnMut(Value) -> Value + 'static + Send,
482    {
483        match self {
484            PipelineData::Value(value, metadata) => {
485                let span = value.span();
486                let pipeline = match value {
487                    Value::List { vals, .. } => vals
488                        .into_iter()
489                        .map(f)
490                        .into_pipeline_data(span, signals.clone()),
491                    Value::Range { val, .. } => val
492                        .into_range_iter(span, Signals::empty())
493                        .map(f)
494                        .into_pipeline_data(span, signals.clone()),
495                    #[expect(deprecated)]
496                    Value::Custom { ref val, .. } if val.is_iterable() => {
497                        match val.to_base_value(span)? {
498                            Value::List { vals, .. } => vals
499                                .into_iter()
500                                .map(f)
501                                .into_pipeline_data(span, signals.clone()),
502                            Value::Range { val, .. } => val
503                                .into_range_iter(span, Signals::empty())
504                                .map(f)
505                                .into_pipeline_data(span, signals.clone()),
506                            value => match f(value) {
507                                Value::Error { error, .. } => return Err(*error),
508                                v => v.into_pipeline_data(),
509                            },
510                        }
511                    }
512                    value => match f(value) {
513                        Value::Error { error, .. } => return Err(*error),
514                        v => v.into_pipeline_data(),
515                    },
516                };
517                Ok(pipeline.set_metadata(metadata))
518            }
519            PipelineData::Empty => Ok(PipelineData::empty()),
520            PipelineData::ListStream(stream, metadata) => {
521                Ok(PipelineData::list_stream(stream.map(f), metadata))
522            }
523            PipelineData::ByteStream(stream, metadata) => {
524                Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
525            }
526        }
527    }
528
529    /// Simplified flatmapper. For full iterator support use `.into_iter()` instead
530    pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
531    where
532        Self: Sized,
533        U: IntoIterator<Item = Value> + 'static,
534        <U as IntoIterator>::IntoIter: 'static + Send,
535        F: FnMut(Value) -> U + 'static + Send,
536    {
537        match self {
538            PipelineData::Empty => Ok(PipelineData::empty()),
539            PipelineData::Value(value, metadata) => {
540                let span = value.span();
541                let pipeline = match value {
542                    Value::List { vals, .. } => vals
543                        .into_iter()
544                        .flat_map(f)
545                        .into_pipeline_data(span, signals.clone()),
546                    Value::Range { val, .. } => val
547                        .into_range_iter(span, Signals::empty())
548                        .flat_map(f)
549                        .into_pipeline_data(span, signals.clone()),
550                    #[expect(deprecated)]
551                    Value::Custom { ref val, .. } if val.is_iterable() => {
552                        match val.to_base_value(span)? {
553                            Value::List { vals, .. } => vals
554                                .into_iter()
555                                .flat_map(f)
556                                .into_pipeline_data(span, signals.clone()),
557                            Value::Range { val, .. } => val
558                                .into_range_iter(span, Signals::empty())
559                                .flat_map(f)
560                                .into_pipeline_data(span, signals.clone()),
561                            value => f(value)
562                                .into_iter()
563                                .into_pipeline_data(span, signals.clone()),
564                        }
565                    }
566                    value => f(value)
567                        .into_iter()
568                        .into_pipeline_data(span, signals.clone()),
569                };
570                Ok(pipeline.set_metadata(metadata))
571            }
572            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
573                stream.modify(|iter| iter.flat_map(f)),
574                metadata,
575            )),
576            PipelineData::ByteStream(stream, metadata) => {
577                // TODO: is this behavior desired / correct ?
578                let span = stream.span();
579                let iter = match String::from_utf8(stream.into_bytes()?) {
580                    Ok(mut str) => {
581                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
582                        f(Value::string(str, span))
583                    }
584                    Err(err) => f(Value::binary(err.into_bytes(), span)),
585                };
586                Ok(iter.into_iter().into_pipeline_data_with_metadata(
587                    span,
588                    signals.clone(),
589                    metadata,
590                ))
591            }
592        }
593    }
594
595    pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
596    where
597        Self: Sized,
598        F: FnMut(&Value) -> bool + 'static + Send,
599    {
600        match self {
601            PipelineData::Empty => Ok(PipelineData::empty()),
602            PipelineData::Value(value, metadata) => {
603                let span = value.span();
604                let pipeline = match value {
605                    Value::List { vals, .. } => vals
606                        .into_iter()
607                        .filter(f)
608                        .into_pipeline_data(span, signals.clone()),
609                    Value::Range { val, .. } => val
610                        .into_range_iter(span, Signals::empty())
611                        .filter(f)
612                        .into_pipeline_data(span, signals.clone()),
613                    #[expect(deprecated)]
614                    Value::Custom { ref val, .. } if val.is_iterable() => {
615                        match val.to_base_value(span)? {
616                            Value::List { vals, .. } => vals
617                                .into_iter()
618                                .filter(f)
619                                .into_pipeline_data(span, signals.clone()),
620                            Value::Range { val, .. } => val
621                                .into_range_iter(span, Signals::empty())
622                                .filter(f)
623                                .into_pipeline_data(span, signals.clone()),
624                            value => {
625                                if f(&value) {
626                                    value.into_pipeline_data()
627                                } else {
628                                    Value::nothing(span).into_pipeline_data()
629                                }
630                            }
631                        }
632                    }
633                    value => {
634                        if f(&value) {
635                            value.into_pipeline_data()
636                        } else {
637                            Value::nothing(span).into_pipeline_data()
638                        }
639                    }
640                };
641                Ok(pipeline.set_metadata(metadata))
642            }
643            PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
644                stream.modify(|iter| iter.filter(f)),
645                metadata,
646            )),
647            PipelineData::ByteStream(stream, metadata) => {
648                // TODO: is this behavior desired / correct ?
649                let span = stream.span();
650                let value = match String::from_utf8(stream.into_bytes()?) {
651                    Ok(mut str) => {
652                        str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
653                        Value::string(str, span)
654                    }
655                    Err(err) => Value::binary(err.into_bytes(), span),
656                };
657                let value = if f(&value) {
658                    value
659                } else {
660                    Value::nothing(span)
661                };
662                Ok(value.into_pipeline_data_with_metadata(metadata))
663            }
664        }
665    }
666
667    /// Try to convert Value from Value::Range to Value::List.
668    /// This is useful to expand Value::Range into array notation, specifically when
669    /// converting `to json` or `to nuon`.
670    /// `1..3 | to XX -> [1,2,3]`
671    pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
672        match self {
673            PipelineData::Value(v, metadata) => {
674                let span = v.span();
675                match v {
676                    Value::Range { val, .. } => {
677                        match *val {
678                            Range::IntRange(range) => {
679                                if range.is_unbounded() {
680                                    return Err(ShellError::Generic(
681                                        GenericError::new(
682                                            "Cannot create range",
683                                            "Unbounded ranges are not allowed when converting to this format",
684                                            span,
685                                        )
686                                        .with_help(
687                                            "Consider using ranges with valid start and end point.",
688                                        ),
689                                    ));
690                                }
691                            }
692                            Range::FloatRange(range) => {
693                                if range.is_unbounded() {
694                                    return Err(ShellError::Generic(
695                                        GenericError::new(
696                                            "Cannot create range",
697                                            "Unbounded ranges are not allowed when converting to this format",
698                                            span,
699                                        )
700                                        .with_help(
701                                            "Consider using ranges with valid start and end point.",
702                                        ),
703                                    ));
704                                }
705                            }
706                        }
707                        let range_values: Vec<Value> =
708                            val.into_range_iter(span, Signals::empty()).collect();
709                        Ok(PipelineData::value(Value::list(range_values, span), None))
710                    }
711                    x => Ok(PipelineData::value(x, metadata)),
712                }
713            }
714            _ => Ok(self),
715        }
716    }
717
718    /// Consume and print self data immediately, formatted using table command.
719    ///
720    /// This does not respect the display_output hook. If a value is being printed out by a command,
721    /// this function should be used. Otherwise, `nu_cli::util::print_pipeline` should be preferred.
722    ///
723    /// `no_newline` controls if we need to attach newline character to output.
724    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
725    pub fn print_table(
726        self,
727        engine_state: &EngineState,
728        stack: &mut Stack,
729        no_newline: bool,
730        to_stderr: bool,
731    ) -> Result<(), ShellError> {
732        match self {
733            // Print byte streams directly as long as they aren't binary.
734            PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
735                stream.print(to_stderr)
736            }
737            _ => {
738                // If the table function is in the declarations, then we can use it
739                // to create the table value that will be printed in the terminal
740                if let Some(decl_id) = engine_state.table_decl_id {
741                    let command = engine_state.get_decl(decl_id);
742                    if command.block_id().is_some() {
743                        self.write_all_and_flush(engine_state, no_newline, to_stderr)
744                    } else {
745                        let call = Call::new(Span::new(0, 0));
746                        let table = command.run(engine_state, stack, &(&call).into(), self)?;
747                        table.write_all_and_flush(engine_state, no_newline, to_stderr)
748                    }
749                } else {
750                    self.write_all_and_flush(engine_state, no_newline, to_stderr)
751                }
752            }
753        }
754    }
755
756    /// Consume and print self data without any extra formatting.
757    ///
758    /// This does not use the `table` command to format data, and also prints binary values and
759    /// streams in their raw format without generating a hexdump first.
760    ///
761    /// `no_newline` controls if we need to attach newline character to output.
762    /// `to_stderr` controls if data is output to stderr, when the value is false, the data is output to stdout.
763    pub fn print_raw(
764        self,
765        engine_state: &EngineState,
766        no_newline: bool,
767        to_stderr: bool,
768    ) -> Result<(), ShellError> {
769        let span = self.span();
770        if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
771            if to_stderr {
772                write_all_and_flush(
773                    bytes,
774                    &mut std::io::stderr().lock(),
775                    "stderr",
776                    span,
777                    engine_state.signals(),
778                )?;
779            } else {
780                write_all_and_flush(
781                    bytes,
782                    &mut std::io::stdout().lock(),
783                    "stdout",
784                    span,
785                    engine_state.signals(),
786                )?;
787            }
788            Ok(())
789        } else {
790            self.write_all_and_flush(engine_state, no_newline, to_stderr)
791        }
792    }
793
794    fn write_all_and_flush(
795        self,
796        engine_state: &EngineState,
797        no_newline: bool,
798        to_stderr: bool,
799    ) -> Result<(), ShellError> {
800        let span = self.span();
801        if let PipelineData::ByteStream(stream, ..) = self {
802            // Copy ByteStreams directly
803            stream.print(to_stderr)
804        } else {
805            let config = engine_state.get_config();
806            for item in self {
807                let mut out = if let Value::Error { error, .. } = item {
808                    return Err(*error);
809                } else {
810                    item.to_expanded_string("\n", config)
811                };
812
813                if !no_newline {
814                    out.push('\n');
815                }
816
817                if to_stderr {
818                    write_all_and_flush(
819                        out,
820                        &mut std::io::stderr().lock(),
821                        "stderr",
822                        span,
823                        engine_state.signals(),
824                    )?;
825                } else {
826                    write_all_and_flush(
827                        out,
828                        &mut std::io::stdout().lock(),
829                        "stdout",
830                        span,
831                        engine_state.signals(),
832                    )?;
833                }
834            }
835
836            Ok(())
837        }
838    }
839
840    pub fn unsupported_input_error(
841        self,
842        expected_type: impl Into<String>,
843        span: Span,
844    ) -> ShellError {
845        match self {
846            PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
847            PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
848                exp_input_type: expected_type.into(),
849                wrong_type: value.get_type().get_non_specified_string(),
850                dst_span: span,
851                src_span: value.span(),
852            },
853            PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
854                exp_input_type: expected_type.into(),
855                wrong_type: "list (stream)".into(),
856                dst_span: span,
857                src_span: stream.span(),
858            },
859            PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
860                exp_input_type: expected_type.into(),
861                wrong_type: stream.type_().describe().into(),
862                dst_span: span,
863                src_span: stream.span(),
864            },
865        }
866    }
867
868    // PipelineData might connect to a running process which has an exit status future
869    // Use this method to retrieve that future, it's useful for implementing `pipefail` feature.
870    #[cfg(feature = "os")]
871    pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
872        match self {
873            PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
874            PipelineData::ByteStream(stream, ..) => match stream.source() {
875                ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
876                ByteStreamSource::Child(c) => {
877                    let exit_future = c.clone_exit_status_future();
878                    let ignore_error = c.clone_ignore_error();
879                    Some(ExitStatusGuard::new(exit_future, ignore_error))
880                }
881            },
882        }
883    }
884}
885
886impl CompareTypes<Type> for PipelineData {
887    fn compare_types(&self, other: &Type) -> Option<TypeRelation> {
888        self.get_type().compare_types(other)
889    }
890
891    /// Determine if the `PipelineData` can be assigned to `other`.
892    ///
893    /// This check makes no effort to collect a stream, so it may be a different result
894    /// than would be returned by calling [`Value::is_subtype_of()`] on the result of
895    /// [`.into_value()`](Self::into_value).
896    ///
897    /// A `ListStream` acts the same as an empty list type: it is a subtype of any [`list`](Type::List)
898    /// or [`table`](Type::Table) type. After converting to a value, it may become a more specific type.
899    /// For example, a `ListStream` is a subtype of `list<int>` and `list<string>`.
900    /// If calling [`.into_value()`](Self::into_value) results in a `list<int>`,
901    /// then the value would not be a subtype of `list<string>`, in contrast to the original `ListStream`.
902    ///
903    /// A `ByteStream` is a subtype of [`string`](Type::String) if it is coercible into a string.
904    /// Likewise, a `ByteStream` is a subtype of [`binary`](Type::Binary) if it is coercible into a binary value.
905    fn is_assignable_to(&self, dst: &Type) -> bool {
906        self.get_type().is_assignable_to(dst)
907    }
908}
909
910pub fn write_all_and_flush<T>(
911    data: T,
912    destination: &mut impl Write,
913    destination_name: &str,
914    span: Option<Span>,
915    signals: &Signals,
916) -> Result<(), ShellError>
917where
918    T: AsRef<[u8]>,
919{
920    let io_error_map = |err: std::io::Error, location: &Location<'_>| {
921        let context = format!("Writing to {destination_name} failed");
922        match span {
923            None => IoError::new_internal_with_location(err, context, location),
924            Some(span) if span == Span::unknown() => {
925                IoError::new_internal_with_location(err, context, location)
926            }
927            Some(span) => IoError::new_with_additional_context(err, span, None, context),
928        }
929    };
930
931    let span = span.unwrap_or(Span::unknown());
932    const OUTPUT_CHUNK_SIZE: usize = 8192;
933    for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
934        signals.check(&span)?;
935        destination
936            .write_all(chunk)
937            .map_err(|err| io_error_map(err, Location::caller()))?;
938    }
939    destination
940        .flush()
941        .map_err(|err| io_error_map(err, Location::caller()))?;
942    Ok(())
943}
944
945enum PipelineIteratorInner {
946    Empty,
947    Value(Value),
948    ListStream(crate::list_stream::IntoIter),
949    ByteStream(crate::byte_stream::Chunks),
950}
951
952pub struct PipelineIterator(PipelineIteratorInner);
953
954impl IntoIterator for PipelineData {
955    type Item = Value;
956
957    type IntoIter = PipelineIterator;
958
959    fn into_iter(self) -> Self::IntoIter {
960        PipelineIterator(match self {
961            PipelineData::Empty => PipelineIteratorInner::Empty,
962            PipelineData::Value(value, ..) => {
963                let span = value.span();
964                match value {
965                    Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
966                        ListStream::new(
967                            vals.into_iter(),
968                            span,
969                            signals.unwrap_or_else(Signals::empty),
970                        )
971                        .into_iter(),
972                    ),
973                    Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
974                        ListStream::new(
975                            val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
976                            span,
977                            Signals::empty(),
978                        )
979                        .into_iter(),
980                    ),
981                    // Handle iterable custom values by converting to base value first
982                    #[expect(deprecated)]
983                    Value::Custom { ref val, .. } if val.is_iterable() => {
984                        match val.to_base_value(span) {
985                            Ok(Value::List { vals, signals, .. }) => {
986                                PipelineIteratorInner::ListStream(
987                                    ListStream::new(
988                                        vals.into_iter(),
989                                        span,
990                                        signals.unwrap_or_else(Signals::empty),
991                                    )
992                                    .into_iter(),
993                                )
994                            }
995                            Ok(other) => PipelineIteratorInner::Value(other),
996                            Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
997                        }
998                    }
999                    x => PipelineIteratorInner::Value(x),
1000                }
1001            }
1002            PipelineData::ListStream(stream, ..) => {
1003                PipelineIteratorInner::ListStream(stream.into_iter())
1004            }
1005            PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
1006                PipelineIteratorInner::Empty,
1007                PipelineIteratorInner::ByteStream,
1008            ),
1009        })
1010    }
1011}
1012
1013impl Iterator for PipelineIterator {
1014    type Item = Value;
1015
1016    fn next(&mut self) -> Option<Self::Item> {
1017        match &mut self.0 {
1018            PipelineIteratorInner::Empty => None,
1019            PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
1020            PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
1021            PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
1022            PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
1023                Ok(x) => x,
1024                Err(err) => Value::error(
1025                    err,
1026                    Span::unknown(), //FIXME: unclear where this span should come from
1027                ),
1028            }),
1029        }
1030    }
1031}
1032
1033pub trait IntoPipelineData {
1034    fn into_pipeline_data(self) -> PipelineData;
1035
1036    fn into_pipeline_data_with_metadata(
1037        self,
1038        metadata: impl Into<Option<PipelineMetadata>>,
1039    ) -> PipelineData;
1040}
1041
1042impl<V> IntoPipelineData for V
1043where
1044    V: Into<Value>,
1045{
1046    fn into_pipeline_data(self) -> PipelineData {
1047        PipelineData::value(self.into(), None)
1048    }
1049
1050    fn into_pipeline_data_with_metadata(
1051        self,
1052        metadata: impl Into<Option<PipelineMetadata>>,
1053    ) -> PipelineData {
1054        PipelineData::value(self.into(), metadata.into())
1055    }
1056}
1057
1058pub trait IntoInterruptiblePipelineData {
1059    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1060    fn into_pipeline_data_with_metadata(
1061        self,
1062        span: Span,
1063        signals: Signals,
1064        metadata: impl Into<Option<PipelineMetadata>>,
1065    ) -> PipelineData;
1066}
1067
1068impl<I> IntoInterruptiblePipelineData for I
1069where
1070    I: IntoIterator + Send + 'static,
1071    I::IntoIter: Send + 'static,
1072    <I::IntoIter as Iterator>::Item: Into<Value>,
1073{
1074    fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1075        ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1076    }
1077
1078    fn into_pipeline_data_with_metadata(
1079        self,
1080        span: Span,
1081        signals: Signals,
1082        metadata: impl Into<Option<PipelineMetadata>>,
1083    ) -> PipelineData {
1084        PipelineData::list_stream(
1085            ListStream::new(self.into_iter().map(Into::into), span, signals),
1086            metadata.into(),
1087        )
1088    }
1089}
1090
1091fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1092    let bytes = match value {
1093        Value::String { val, .. } => val.into_bytes(),
1094        Value::Binary { val, .. } => val,
1095        Value::List { vals, .. } => {
1096            let val = vals
1097                .into_iter()
1098                .map(Value::coerce_into_string)
1099                .collect::<Result<Vec<String>, ShellError>>()?
1100                .join("\n")
1101                + "\n";
1102
1103            val.into_bytes()
1104        }
1105        // Propagate errors by explicitly matching them before the final case.
1106        Value::Error { error, .. } => return Err(*error),
1107        value => value.coerce_into_string()?.into_bytes(),
1108    };
1109    Ok(bytes)
1110}
1111
1112/// A wrapper to [`PipelineData`] which can also track exit status.
1113///
1114/// We use exit status tracking to implement the `pipefail` feature.
1115#[derive(Debug)]
1116pub struct PipelineExecutionData {
1117    pub body: PipelineData,
1118    #[cfg(feature = "os")]
1119    pub exit: Vec<Option<ExitStatusGuard>>,
1120}
1121
1122impl Deref for PipelineExecutionData {
1123    type Target = PipelineData;
1124
1125    fn deref(&self) -> &Self::Target {
1126        &self.body
1127    }
1128}
1129
1130impl DerefMut for PipelineExecutionData {
1131    fn deref_mut(&mut self) -> &mut Self::Target {
1132        &mut self.body
1133    }
1134}
1135
1136impl PipelineExecutionData {
1137    pub fn empty() -> Self {
1138        Self {
1139            body: PipelineData::empty(),
1140            #[cfg(feature = "os")]
1141            exit: vec![],
1142        }
1143    }
1144}
1145
1146impl From<PipelineData> for PipelineExecutionData {
1147    #[cfg(feature = "os")]
1148    fn from(value: PipelineData) -> Self {
1149        let value_span = value.span().unwrap_or_else(Span::unknown);
1150        let exit_status_future = value
1151            .clone_exit_status_future()
1152            .map(|f| f.with_span(value_span));
1153        Self {
1154            body: value,
1155            exit: vec![exit_status_future],
1156        }
1157    }
1158
1159    #[cfg(not(feature = "os"))]
1160    fn from(value: PipelineData) -> Self {
1161        Self { body: value }
1162    }
1163}