1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4 ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5 Range, ShellError, Signals, Span, Type, Value,
6 ast::{Call, PathMember},
7 engine::{EngineState, Stack},
8 shell_error::{generic::GenericError, io::IoError},
9};
10use std::{
11 borrow::Cow,
12 io::Write,
13 ops::{Deref, DerefMut},
14 panic::Location,
15};
16
17const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
18
19#[derive(Debug)]
49pub enum PipelineData {
50 Empty,
51 Value(Value, Option<PipelineMetadata>),
52 ListStream(ListStream, Option<PipelineMetadata>),
53 ByteStream(ByteStream, Option<PipelineMetadata>),
54}
55
56impl PipelineData {
57 pub const fn empty() -> PipelineData {
58 PipelineData::Empty
59 }
60
61 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62 PipelineData::Value(val, metadata.into())
63 }
64
65 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66 PipelineData::ListStream(stream, metadata.into())
67 }
68
69 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
70 PipelineData::ByteStream(stream, metadata.into())
71 }
72
73 #[deprecated(
79 since = "0.111.1",
80 note = "Use .metadata_ref(), .metadata_mut() or .take_metadata() instead"
81 )]
82 pub fn metadata(&self) -> Option<PipelineMetadata> {
83 self.metadata_ref().cloned()
84 }
85
86 pub fn metadata_ref(&self) -> Option<&PipelineMetadata> {
88 match self {
89 PipelineData::Empty => None,
90 PipelineData::Value(_, meta)
91 | PipelineData::ListStream(_, meta)
92 | PipelineData::ByteStream(_, meta) => meta.as_ref(),
93 }
94 }
95
96 pub fn metadata_mut(&mut self) -> Option<&mut PipelineMetadata> {
98 match self {
99 PipelineData::Empty => None,
100 PipelineData::Value(_, meta)
101 | PipelineData::ListStream(_, meta)
102 | PipelineData::ByteStream(_, meta) => meta.as_mut(),
103 }
104 }
105
106 pub fn take_metadata(&mut self) -> Option<PipelineMetadata> {
108 match self {
109 PipelineData::Empty => None,
110 PipelineData::Value(_, meta)
111 | PipelineData::ListStream(_, meta)
112 | PipelineData::ByteStream(_, meta) => meta.take(),
113 }
114 }
115
116 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
117 match &mut self {
118 PipelineData::Empty => {}
119 PipelineData::Value(_, meta)
120 | PipelineData::ListStream(_, meta)
121 | PipelineData::ByteStream(_, meta) => *meta = metadata,
122 }
123 self
124 }
125
126 pub fn is_nothing(&self) -> bool {
127 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
128 || matches!(self, PipelineData::Empty)
129 }
130
131 pub fn span(&self) -> Option<Span> {
133 match self {
134 PipelineData::Empty => None,
135 PipelineData::Value(value, ..) => Some(value.span()),
136 PipelineData::ListStream(stream, ..) => Some(stream.span()),
137 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
138 }
139 }
140
141 pub fn with_span(self, span: Span) -> Self {
145 match self {
146 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
147 PipelineData::Value(value, metadata) => {
148 PipelineData::value(value.with_span(span), metadata)
149 }
150 PipelineData::ListStream(stream, metadata) => {
151 PipelineData::list_stream(stream.with_span(span), metadata)
152 }
153 PipelineData::ByteStream(stream, metadata) => {
154 PipelineData::byte_stream(stream.with_span(span), metadata)
155 }
156 }
157 }
158
159 pub fn get_type(&self) -> Type {
170 match self {
171 PipelineData::Empty => Type::Nothing,
172 PipelineData::Value(value, _) => value.get_type(),
173 PipelineData::ListStream(_, _) => Type::list(Type::Any),
174 PipelineData::ByteStream(stream, _) => stream.type_().into(),
175 }
176 }
177
178 pub fn is_subtype_of(&self, other: &Type) -> bool {
193 match (self, other) {
194 (_, Type::Any) => true,
195 (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
196 (PipelineData::Empty, Type::Nothing) => true,
197 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
198
199 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
201
202 (PipelineData::ByteStream(stream, ..), Type::String)
203 if stream.type_().is_string_coercible() =>
204 {
205 true
206 }
207 (PipelineData::ByteStream(stream, ..), Type::Binary)
208 if stream.type_().is_binary_coercible() =>
209 {
210 true
211 }
212
213 (PipelineData::Empty, _) => false,
214 (PipelineData::ListStream(..), _) => false,
215 (PipelineData::ByteStream(..), _) => false,
216 }
217 }
218
219 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
220 match self {
221 PipelineData::Empty => Ok(Value::nothing(span)),
222 PipelineData::Value(value, ..) => {
223 if value.span() == Span::unknown() {
224 Ok(value.with_span(span))
225 } else {
226 Ok(value)
227 }
228 }
229 PipelineData::ListStream(stream, ..) => stream.into_value(),
230 PipelineData::ByteStream(stream, ..) => stream.into_value(),
231 }
232 }
233
234 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
242 let span = self.span().unwrap_or(Span::unknown());
243 match self {
244 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
245 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
246 let metadata = metadata.clone();
247 Ok(PipelineData::list_stream(
248 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
249 metadata,
250 ))
251 }
252 PipelineData::Value(Value::String { val, .. }, metadata) => {
253 Ok(PipelineData::byte_stream(
254 ByteStream::read_string(val, span, engine_state.signals().clone()),
255 metadata,
256 ))
257 }
258 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
259 Ok(PipelineData::byte_stream(
260 ByteStream::read_binary(val, span, engine_state.signals().clone()),
261 metadata,
262 ))
263 }
264 PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
265 match val.to_base_value(internal_span) {
266 Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
267 ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
268 metadata,
269 )),
270 Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
271 ListStream::new(
272 val.into_range_iter(span, Signals::empty()),
273 span,
274 engine_state.signals().clone(),
275 ),
276 metadata,
277 )),
278 Ok(other) => Err(PipelineData::value(other, metadata)),
279 Err(_) => Err(PipelineData::Value(
280 Value::Custom { val, internal_span },
281 metadata,
282 )),
283 }
284 }
285 _ => Err(self),
286 }
287 }
288
289 #[must_use]
294 pub fn into_stream_or_original(self, engine_state: &EngineState) -> PipelineData {
295 self.try_into_stream(engine_state)
296 .unwrap_or_else(|original| original)
297 }
298
299 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
303 match self {
304 PipelineData::Empty => Ok(()),
305 PipelineData::Value(value, ..) => {
306 let bytes = value_to_bytes(value)?;
307 dest.write_all(&bytes).map_err(|err| {
308 IoError::new_internal(err, "Could not write PipelineData to dest")
309 })?;
310 dest.flush().map_err(|err| {
311 IoError::new_internal(err, "Could not flush PipelineData to dest")
312 })?;
313 Ok(())
314 }
315 PipelineData::ListStream(stream, ..) => {
316 for value in stream {
317 let bytes = value_to_bytes(value)?;
318 dest.write_all(&bytes).map_err(|err| {
319 IoError::new_internal(err, "Could not write PipelineData to dest")
320 })?;
321 dest.write_all(b"\n").map_err(|err| {
322 IoError::new_internal(
323 err,
324 "Could not write linebreak after PipelineData to dest",
325 )
326 })?;
327 }
328 dest.flush().map_err(|err| {
329 IoError::new_internal(err, "Could not flush PipelineData to dest")
330 })?;
331 Ok(())
332 }
333 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
334 }
335 }
336
337 pub fn drain_to_out_dests(
344 mut self,
345 engine_state: &EngineState,
346 stack: &mut Stack,
347 ) -> Result<Self, ShellError> {
348 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
349 OutDest::Print => {
350 self.print_table(engine_state, stack, false, false)?;
351 Ok(Self::Empty)
352 }
353 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
354 OutDest::Value => {
355 let metadata = self.take_metadata();
356 let span = self.span().unwrap_or(Span::unknown());
357 self.into_value(span).map(|val| Self::Value(val, metadata))
358 }
359 OutDest::File(file) => {
360 self.write_to(file.as_ref())?;
361 Ok(Self::Empty)
362 }
363 OutDest::Null | OutDest::Inherit => {
364 self.drain()?;
365 Ok(Self::Empty)
366 }
367 }
368 }
369
370 pub fn drain(self) -> Result<(), ShellError> {
371 match self {
372 Self::Empty => Ok(()),
373 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
374 Self::Value(..) => Ok(()),
375 Self::ListStream(stream, ..) => stream.drain(),
376 Self::ByteStream(stream, ..) => stream.drain(),
377 }
378 }
379
380 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
386 Ok(PipelineIterator(match self {
387 PipelineData::Value(value, ..) => {
388 let val_span = value.span();
389 match value {
390 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
391 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
392 ),
393 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
394 ListStream::new(
395 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
396 val_span,
397 Signals::empty(),
398 )
399 .into_iter(),
400 ),
401 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
402 ListStream::new(
403 val.into_range_iter(val_span, Signals::empty()),
404 val_span,
405 Signals::empty(),
406 )
407 .into_iter(),
408 ),
409 #[expect(deprecated)]
411 Value::Custom { ref val, .. } if val.is_iterable() => {
412 match val.to_base_value(val_span) {
413 Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
414 ListStream::new(vals.into_iter(), val_span, Signals::empty())
415 .into_iter(),
416 ),
417 Ok(other) => {
418 return Err(ShellError::OnlySupportsThisInputType {
419 exp_input_type: "list, binary, range, or byte stream".into(),
420 wrong_type: other.get_type().to_string(),
421 dst_span: span,
422 src_span: val_span,
423 });
424 }
425 Err(err) => return Err(err),
426 }
427 }
428 Value::Error { error, .. } => return Err(*error),
430 other => {
431 return Err(ShellError::OnlySupportsThisInputType {
432 exp_input_type: "list, binary, range, or byte stream".into(),
433 wrong_type: other.get_type().to_string(),
434 dst_span: span,
435 src_span: val_span,
436 });
437 }
438 }
439 }
440 PipelineData::ListStream(stream, ..) => {
441 PipelineIteratorInner::ListStream(stream.into_iter())
442 }
443 PipelineData::Empty => {
444 return Err(ShellError::OnlySupportsThisInputType {
445 exp_input_type: "list, binary, range, or byte stream".into(),
446 wrong_type: "null".into(),
447 dst_span: span,
448 src_span: span,
449 });
450 }
451 PipelineData::ByteStream(stream, ..) => {
452 if let Some(chunks) = stream.chunks() {
453 PipelineIteratorInner::ByteStream(chunks)
454 } else {
455 PipelineIteratorInner::Empty
456 }
457 }
458 }))
459 }
460
461 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
462 match self {
463 PipelineData::Empty => Ok(String::new()),
464 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
465 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
466 PipelineData::ByteStream(stream, ..) => stream.into_string(),
467 }
468 }
469
470 pub fn collect_string_strict(
475 self,
476 span: Span,
477 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
478 match self {
479 PipelineData::Empty => Ok((String::new(), span, None)),
480 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
481 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
482 err_message: "string".into(),
483 span: val.span(),
484 }),
485 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
486 err_message: "string".into(),
487 span,
488 }),
489 PipelineData::ByteStream(stream, metadata) => {
490 let span = stream.span();
491 Ok((stream.into_string()?, span, metadata))
492 }
493 }
494 }
495
496 pub fn follow_cell_path(
497 self,
498 cell_path: &[PathMember],
499 head: Span,
500 ) -> Result<Value, ShellError> {
501 match self {
502 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
504 .follow_cell_path(cell_path)
505 .map(Cow::into_owned),
506 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
507 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
508 type_name: "empty pipeline".to_string(),
509 span: head,
510 }),
511 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
512 type_name: stream.type_().describe().to_owned(),
513 span: stream.span(),
514 }),
515 }
516 }
517
518 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
520 where
521 Self: Sized,
522 F: FnMut(Value) -> Value + 'static + Send,
523 {
524 match self {
525 PipelineData::Value(value, metadata) => {
526 let span = value.span();
527 let pipeline = match value {
528 Value::List { vals, .. } => vals
529 .into_iter()
530 .map(f)
531 .into_pipeline_data(span, signals.clone()),
532 Value::Range { val, .. } => val
533 .into_range_iter(span, Signals::empty())
534 .map(f)
535 .into_pipeline_data(span, signals.clone()),
536 #[expect(deprecated)]
537 Value::Custom { ref val, .. } if val.is_iterable() => {
538 match val.to_base_value(span)? {
539 Value::List { vals, .. } => vals
540 .into_iter()
541 .map(f)
542 .into_pipeline_data(span, signals.clone()),
543 Value::Range { val, .. } => val
544 .into_range_iter(span, Signals::empty())
545 .map(f)
546 .into_pipeline_data(span, signals.clone()),
547 value => match f(value) {
548 Value::Error { error, .. } => return Err(*error),
549 v => v.into_pipeline_data(),
550 },
551 }
552 }
553 value => match f(value) {
554 Value::Error { error, .. } => return Err(*error),
555 v => v.into_pipeline_data(),
556 },
557 };
558 Ok(pipeline.set_metadata(metadata))
559 }
560 PipelineData::Empty => Ok(PipelineData::empty()),
561 PipelineData::ListStream(stream, metadata) => {
562 Ok(PipelineData::list_stream(stream.map(f), metadata))
563 }
564 PipelineData::ByteStream(stream, metadata) => {
565 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
566 }
567 }
568 }
569
570 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
572 where
573 Self: Sized,
574 U: IntoIterator<Item = Value> + 'static,
575 <U as IntoIterator>::IntoIter: 'static + Send,
576 F: FnMut(Value) -> U + 'static + Send,
577 {
578 match self {
579 PipelineData::Empty => Ok(PipelineData::empty()),
580 PipelineData::Value(value, metadata) => {
581 let span = value.span();
582 let pipeline = match value {
583 Value::List { vals, .. } => vals
584 .into_iter()
585 .flat_map(f)
586 .into_pipeline_data(span, signals.clone()),
587 Value::Range { val, .. } => val
588 .into_range_iter(span, Signals::empty())
589 .flat_map(f)
590 .into_pipeline_data(span, signals.clone()),
591 #[expect(deprecated)]
592 Value::Custom { ref val, .. } if val.is_iterable() => {
593 match val.to_base_value(span)? {
594 Value::List { vals, .. } => vals
595 .into_iter()
596 .flat_map(f)
597 .into_pipeline_data(span, signals.clone()),
598 Value::Range { val, .. } => val
599 .into_range_iter(span, Signals::empty())
600 .flat_map(f)
601 .into_pipeline_data(span, signals.clone()),
602 value => f(value)
603 .into_iter()
604 .into_pipeline_data(span, signals.clone()),
605 }
606 }
607 value => f(value)
608 .into_iter()
609 .into_pipeline_data(span, signals.clone()),
610 };
611 Ok(pipeline.set_metadata(metadata))
612 }
613 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
614 stream.modify(|iter| iter.flat_map(f)),
615 metadata,
616 )),
617 PipelineData::ByteStream(stream, metadata) => {
618 let span = stream.span();
620 let iter = match String::from_utf8(stream.into_bytes()?) {
621 Ok(mut str) => {
622 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
623 f(Value::string(str, span))
624 }
625 Err(err) => f(Value::binary(err.into_bytes(), span)),
626 };
627 Ok(iter.into_iter().into_pipeline_data_with_metadata(
628 span,
629 signals.clone(),
630 metadata,
631 ))
632 }
633 }
634 }
635
636 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
637 where
638 Self: Sized,
639 F: FnMut(&Value) -> bool + 'static + Send,
640 {
641 match self {
642 PipelineData::Empty => Ok(PipelineData::empty()),
643 PipelineData::Value(value, metadata) => {
644 let span = value.span();
645 let pipeline = match value {
646 Value::List { vals, .. } => vals
647 .into_iter()
648 .filter(f)
649 .into_pipeline_data(span, signals.clone()),
650 Value::Range { val, .. } => val
651 .into_range_iter(span, Signals::empty())
652 .filter(f)
653 .into_pipeline_data(span, signals.clone()),
654 #[expect(deprecated)]
655 Value::Custom { ref val, .. } if val.is_iterable() => {
656 match val.to_base_value(span)? {
657 Value::List { vals, .. } => vals
658 .into_iter()
659 .filter(f)
660 .into_pipeline_data(span, signals.clone()),
661 Value::Range { val, .. } => val
662 .into_range_iter(span, Signals::empty())
663 .filter(f)
664 .into_pipeline_data(span, signals.clone()),
665 value => {
666 if f(&value) {
667 value.into_pipeline_data()
668 } else {
669 Value::nothing(span).into_pipeline_data()
670 }
671 }
672 }
673 }
674 value => {
675 if f(&value) {
676 value.into_pipeline_data()
677 } else {
678 Value::nothing(span).into_pipeline_data()
679 }
680 }
681 };
682 Ok(pipeline.set_metadata(metadata))
683 }
684 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
685 stream.modify(|iter| iter.filter(f)),
686 metadata,
687 )),
688 PipelineData::ByteStream(stream, metadata) => {
689 let span = stream.span();
691 let value = match String::from_utf8(stream.into_bytes()?) {
692 Ok(mut str) => {
693 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
694 Value::string(str, span)
695 }
696 Err(err) => Value::binary(err.into_bytes(), span),
697 };
698 let value = if f(&value) {
699 value
700 } else {
701 Value::nothing(span)
702 };
703 Ok(value.into_pipeline_data_with_metadata(metadata))
704 }
705 }
706 }
707
708 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
713 match self {
714 PipelineData::Value(v, metadata) => {
715 let span = v.span();
716 match v {
717 Value::Range { val, .. } => {
718 match *val {
719 Range::IntRange(range) => {
720 if range.is_unbounded() {
721 return Err(ShellError::Generic(
722 GenericError::new(
723 "Cannot create range",
724 "Unbounded ranges are not allowed when converting to this format",
725 span,
726 )
727 .with_help(
728 "Consider using ranges with valid start and end point.",
729 ),
730 ));
731 }
732 }
733 Range::FloatRange(range) => {
734 if range.is_unbounded() {
735 return Err(ShellError::Generic(
736 GenericError::new(
737 "Cannot create range",
738 "Unbounded ranges are not allowed when converting to this format",
739 span,
740 )
741 .with_help(
742 "Consider using ranges with valid start and end point.",
743 ),
744 ));
745 }
746 }
747 }
748 let range_values: Vec<Value> =
749 val.into_range_iter(span, Signals::empty()).collect();
750 Ok(PipelineData::value(Value::list(range_values, span), None))
751 }
752 x => Ok(PipelineData::value(x, metadata)),
753 }
754 }
755 _ => Ok(self),
756 }
757 }
758
759 pub fn print_table(
767 self,
768 engine_state: &EngineState,
769 stack: &mut Stack,
770 no_newline: bool,
771 to_stderr: bool,
772 ) -> Result<(), ShellError> {
773 match self {
774 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
776 stream.print(to_stderr)
777 }
778 _ => {
779 if let Some(decl_id) = engine_state.table_decl_id {
782 let command = engine_state.get_decl(decl_id);
783 if command.block_id().is_some() {
784 self.write_all_and_flush(engine_state, no_newline, to_stderr)
785 } else {
786 let call = Call::new(Span::new(0, 0));
787 let table = command.run(engine_state, stack, &(&call).into(), self)?;
788 table.write_all_and_flush(engine_state, no_newline, to_stderr)
789 }
790 } else {
791 self.write_all_and_flush(engine_state, no_newline, to_stderr)
792 }
793 }
794 }
795 }
796
797 pub fn print_raw(
805 self,
806 engine_state: &EngineState,
807 no_newline: bool,
808 to_stderr: bool,
809 ) -> Result<(), ShellError> {
810 let span = self.span();
811 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
812 if to_stderr {
813 write_all_and_flush(
814 bytes,
815 &mut std::io::stderr().lock(),
816 "stderr",
817 span,
818 engine_state.signals(),
819 )?;
820 } else {
821 write_all_and_flush(
822 bytes,
823 &mut std::io::stdout().lock(),
824 "stdout",
825 span,
826 engine_state.signals(),
827 )?;
828 }
829 Ok(())
830 } else {
831 self.write_all_and_flush(engine_state, no_newline, to_stderr)
832 }
833 }
834
835 fn write_all_and_flush(
836 self,
837 engine_state: &EngineState,
838 no_newline: bool,
839 to_stderr: bool,
840 ) -> Result<(), ShellError> {
841 let span = self.span();
842 if let PipelineData::ByteStream(stream, ..) = self {
843 stream.print(to_stderr)
845 } else {
846 let config = engine_state.get_config();
847 for item in self {
848 let mut out = if let Value::Error { error, .. } = item {
849 return Err(*error);
850 } else {
851 item.to_expanded_string("\n", config)
852 };
853
854 if !no_newline {
855 out.push('\n');
856 }
857
858 if to_stderr {
859 write_all_and_flush(
860 out,
861 &mut std::io::stderr().lock(),
862 "stderr",
863 span,
864 engine_state.signals(),
865 )?;
866 } else {
867 write_all_and_flush(
868 out,
869 &mut std::io::stdout().lock(),
870 "stdout",
871 span,
872 engine_state.signals(),
873 )?;
874 }
875 }
876
877 Ok(())
878 }
879 }
880
881 pub fn unsupported_input_error(
882 self,
883 expected_type: impl Into<String>,
884 span: Span,
885 ) -> ShellError {
886 match self {
887 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
888 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
889 exp_input_type: expected_type.into(),
890 wrong_type: value.get_type().get_non_specified_string(),
891 dst_span: span,
892 src_span: value.span(),
893 },
894 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
895 exp_input_type: expected_type.into(),
896 wrong_type: "list (stream)".into(),
897 dst_span: span,
898 src_span: stream.span(),
899 },
900 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
901 exp_input_type: expected_type.into(),
902 wrong_type: stream.type_().describe().into(),
903 dst_span: span,
904 src_span: stream.span(),
905 },
906 }
907 }
908
909 #[cfg(feature = "os")]
912 pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
913 match self {
914 PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
915 PipelineData::ByteStream(stream, ..) => match stream.source() {
916 ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
917 ByteStreamSource::Child(c) => {
918 let exit_future = c.clone_exit_status_future();
919 let ignore_error = c.clone_ignore_error();
920 Some(ExitStatusGuard::new(exit_future, ignore_error))
921 }
922 },
923 }
924 }
925}
926
927pub fn write_all_and_flush<T>(
928 data: T,
929 destination: &mut impl Write,
930 destination_name: &str,
931 span: Option<Span>,
932 signals: &Signals,
933) -> Result<(), ShellError>
934where
935 T: AsRef<[u8]>,
936{
937 let io_error_map = |err: std::io::Error, location: &Location<'_>| {
938 let context = format!("Writing to {destination_name} failed");
939 match span {
940 None => IoError::new_internal_with_location(err, context, location),
941 Some(span) if span == Span::unknown() => {
942 IoError::new_internal_with_location(err, context, location)
943 }
944 Some(span) => IoError::new_with_additional_context(err, span, None, context),
945 }
946 };
947
948 let span = span.unwrap_or(Span::unknown());
949 const OUTPUT_CHUNK_SIZE: usize = 8192;
950 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
951 signals.check(&span)?;
952 destination
953 .write_all(chunk)
954 .map_err(|err| io_error_map(err, Location::caller()))?;
955 }
956 destination
957 .flush()
958 .map_err(|err| io_error_map(err, Location::caller()))?;
959 Ok(())
960}
961
962enum PipelineIteratorInner {
963 Empty,
964 Value(Value),
965 ListStream(crate::list_stream::IntoIter),
966 ByteStream(crate::byte_stream::Chunks),
967}
968
969pub struct PipelineIterator(PipelineIteratorInner);
970
971impl IntoIterator for PipelineData {
972 type Item = Value;
973
974 type IntoIter = PipelineIterator;
975
976 fn into_iter(self) -> Self::IntoIter {
977 PipelineIterator(match self {
978 PipelineData::Empty => PipelineIteratorInner::Empty,
979 PipelineData::Value(value, ..) => {
980 let span = value.span();
981 match value {
982 Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
983 ListStream::new(
984 vals.into_iter(),
985 span,
986 signals.unwrap_or_else(Signals::empty),
987 )
988 .into_iter(),
989 ),
990 Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
991 ListStream::new(
992 val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
993 span,
994 Signals::empty(),
995 )
996 .into_iter(),
997 ),
998 #[expect(deprecated)]
1000 Value::Custom { ref val, .. } if val.is_iterable() => {
1001 match val.to_base_value(span) {
1002 Ok(Value::List { vals, signals, .. }) => {
1003 PipelineIteratorInner::ListStream(
1004 ListStream::new(
1005 vals.into_iter(),
1006 span,
1007 signals.unwrap_or_else(Signals::empty),
1008 )
1009 .into_iter(),
1010 )
1011 }
1012 Ok(other) => PipelineIteratorInner::Value(other),
1013 Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
1014 }
1015 }
1016 x => PipelineIteratorInner::Value(x),
1017 }
1018 }
1019 PipelineData::ListStream(stream, ..) => {
1020 PipelineIteratorInner::ListStream(stream.into_iter())
1021 }
1022 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
1023 PipelineIteratorInner::Empty,
1024 PipelineIteratorInner::ByteStream,
1025 ),
1026 })
1027 }
1028}
1029
1030impl Iterator for PipelineIterator {
1031 type Item = Value;
1032
1033 fn next(&mut self) -> Option<Self::Item> {
1034 match &mut self.0 {
1035 PipelineIteratorInner::Empty => None,
1036 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
1037 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
1038 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
1039 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
1040 Ok(x) => x,
1041 Err(err) => Value::error(
1042 err,
1043 Span::unknown(), ),
1045 }),
1046 }
1047 }
1048}
1049
1050pub trait IntoPipelineData {
1051 fn into_pipeline_data(self) -> PipelineData;
1052
1053 fn into_pipeline_data_with_metadata(
1054 self,
1055 metadata: impl Into<Option<PipelineMetadata>>,
1056 ) -> PipelineData;
1057}
1058
1059impl<V> IntoPipelineData for V
1060where
1061 V: Into<Value>,
1062{
1063 fn into_pipeline_data(self) -> PipelineData {
1064 PipelineData::value(self.into(), None)
1065 }
1066
1067 fn into_pipeline_data_with_metadata(
1068 self,
1069 metadata: impl Into<Option<PipelineMetadata>>,
1070 ) -> PipelineData {
1071 PipelineData::value(self.into(), metadata.into())
1072 }
1073}
1074
1075pub trait IntoInterruptiblePipelineData {
1076 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1077 fn into_pipeline_data_with_metadata(
1078 self,
1079 span: Span,
1080 signals: Signals,
1081 metadata: impl Into<Option<PipelineMetadata>>,
1082 ) -> PipelineData;
1083}
1084
1085impl<I> IntoInterruptiblePipelineData for I
1086where
1087 I: IntoIterator + Send + 'static,
1088 I::IntoIter: Send + 'static,
1089 <I::IntoIter as Iterator>::Item: Into<Value>,
1090{
1091 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1092 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1093 }
1094
1095 fn into_pipeline_data_with_metadata(
1096 self,
1097 span: Span,
1098 signals: Signals,
1099 metadata: impl Into<Option<PipelineMetadata>>,
1100 ) -> PipelineData {
1101 PipelineData::list_stream(
1102 ListStream::new(self.into_iter().map(Into::into), span, signals),
1103 metadata.into(),
1104 )
1105 }
1106}
1107
1108fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1109 let bytes = match value {
1110 Value::String { val, .. } => val.into_bytes(),
1111 Value::Binary { val, .. } => val,
1112 Value::List { vals, .. } => {
1113 let val = vals
1114 .into_iter()
1115 .map(Value::coerce_into_string)
1116 .collect::<Result<Vec<String>, ShellError>>()?
1117 .join("\n")
1118 + "\n";
1119
1120 val.into_bytes()
1121 }
1122 Value::Error { error, .. } => return Err(*error),
1124 value => value.coerce_into_string()?.into_bytes(),
1125 };
1126 Ok(bytes)
1127}
1128
1129#[derive(Debug)]
1133pub struct PipelineExecutionData {
1134 pub body: PipelineData,
1135 #[cfg(feature = "os")]
1136 pub exit: Vec<Option<ExitStatusGuard>>,
1137}
1138
1139impl Deref for PipelineExecutionData {
1140 type Target = PipelineData;
1141
1142 fn deref(&self) -> &Self::Target {
1143 &self.body
1144 }
1145}
1146
1147impl DerefMut for PipelineExecutionData {
1148 fn deref_mut(&mut self) -> &mut Self::Target {
1149 &mut self.body
1150 }
1151}
1152
1153impl PipelineExecutionData {
1154 pub fn empty() -> Self {
1155 Self {
1156 body: PipelineData::empty(),
1157 #[cfg(feature = "os")]
1158 exit: vec![],
1159 }
1160 }
1161}
1162
1163impl From<PipelineData> for PipelineExecutionData {
1164 #[cfg(feature = "os")]
1165 fn from(value: PipelineData) -> Self {
1166 let value_span = value.span().unwrap_or_else(Span::unknown);
1167 let exit_status_future = value
1168 .clone_exit_status_future()
1169 .map(|f| f.with_span(value_span));
1170 Self {
1171 body: value,
1172 exit: vec![exit_status_future],
1173 }
1174 }
1175
1176 #[cfg(not(feature = "os"))]
1177 fn from(value: PipelineData) -> Self {
1178 Self { body: value }
1179 }
1180}