1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4 ByteStream, ByteStreamSource, ByteStreamType, CompareTypes, Config, ListStream, OutDest,
5 PipelineMetadata, Range, ShellError, Signals, Span, Type, TypeRelation, Value,
6 ast::{Call, PathMember},
7 engine::{EngineState, Stack},
8 shell_error::{generic::GenericError, io::IoError},
9};
10use std::{
11 borrow::Cow,
12 io::Write,
13 ops::{Deref, DerefMut},
14 panic::Location,
15};
16
17const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
18
19#[derive(Debug)]
49pub enum PipelineData {
50 Empty,
51 Value(Value, Option<PipelineMetadata>),
52 ListStream(ListStream, Option<PipelineMetadata>),
53 ByteStream(ByteStream, Option<PipelineMetadata>),
54}
55
56impl PipelineData {
57 pub const fn empty() -> PipelineData {
58 PipelineData::Empty
59 }
60
61 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62 PipelineData::Value(val, metadata.into())
63 }
64
65 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66 PipelineData::ListStream(stream, metadata.into())
67 }
68
69 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
70 PipelineData::ByteStream(stream, metadata.into())
71 }
72
73 #[deprecated(
79 since = "0.111.1",
80 note = "Use .metadata_ref(), .metadata_mut() or .take_metadata() instead"
81 )]
82 pub fn metadata(&self) -> Option<PipelineMetadata> {
83 self.metadata_ref().cloned()
84 }
85
86 pub fn metadata_ref(&self) -> Option<&PipelineMetadata> {
88 match self {
89 PipelineData::Empty => None,
90 PipelineData::Value(_, meta)
91 | PipelineData::ListStream(_, meta)
92 | PipelineData::ByteStream(_, meta) => meta.as_ref(),
93 }
94 }
95
96 pub fn metadata_mut(&mut self) -> Option<&mut PipelineMetadata> {
98 match self {
99 PipelineData::Empty => None,
100 PipelineData::Value(_, meta)
101 | PipelineData::ListStream(_, meta)
102 | PipelineData::ByteStream(_, meta) => meta.as_mut(),
103 }
104 }
105
106 pub fn take_metadata(&mut self) -> Option<PipelineMetadata> {
108 match self {
109 PipelineData::Empty => None,
110 PipelineData::Value(_, meta)
111 | PipelineData::ListStream(_, meta)
112 | PipelineData::ByteStream(_, meta) => meta.take(),
113 }
114 }
115
116 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
117 match &mut self {
118 PipelineData::Empty => {}
119 PipelineData::Value(_, meta)
120 | PipelineData::ListStream(_, meta)
121 | PipelineData::ByteStream(_, meta) => *meta = metadata,
122 }
123 self
124 }
125
126 pub fn is_nothing(&self) -> bool {
127 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
128 || matches!(self, PipelineData::Empty)
129 }
130
131 pub fn span(&self) -> Option<Span> {
133 match self {
134 PipelineData::Empty => None,
135 PipelineData::Value(value, ..) => Some(value.span()),
136 PipelineData::ListStream(stream, ..) => Some(stream.span()),
137 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
138 }
139 }
140
141 pub fn with_span(self, span: Span) -> Self {
145 match self {
146 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
147 PipelineData::Value(value, metadata) => {
148 PipelineData::value(value.with_span(span), metadata)
149 }
150 PipelineData::ListStream(stream, metadata) => {
151 PipelineData::list_stream(stream.with_span(span), metadata)
152 }
153 PipelineData::ByteStream(stream, metadata) => {
154 PipelineData::byte_stream(stream.with_span(span), metadata)
155 }
156 }
157 }
158
159 pub fn get_type(&self) -> Type {
170 match self {
171 PipelineData::Empty => Type::Nothing,
172 PipelineData::Value(value, _) => value.get_type(),
173 PipelineData::ListStream(_, _) => Type::list(Type::Any),
174 PipelineData::ByteStream(stream, _) => stream.type_().into(),
175 }
176 }
177
178 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
179 match self {
180 PipelineData::Empty => Ok(Value::nothing(span)),
181 PipelineData::Value(value, ..) => {
182 if value.span() == Span::unknown() {
183 Ok(value.with_span(span))
184 } else {
185 Ok(value)
186 }
187 }
188 PipelineData::ListStream(stream, ..) => stream.into_value(),
189 PipelineData::ByteStream(stream, ..) => stream.into_value(),
190 }
191 }
192
193 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
201 let span = self.span().unwrap_or(Span::unknown());
202 match self {
203 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
204 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
205 let metadata = metadata.clone();
206 Ok(PipelineData::list_stream(
207 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
208 metadata,
209 ))
210 }
211 PipelineData::Value(Value::String { val, .. }, metadata) => {
212 Ok(PipelineData::byte_stream(
213 ByteStream::read_string(val, span, engine_state.signals().clone()),
214 metadata,
215 ))
216 }
217 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
218 Ok(PipelineData::byte_stream(
219 ByteStream::read_binary(val, span, engine_state.signals().clone()),
220 metadata,
221 ))
222 }
223 PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
224 match val.to_base_value(internal_span) {
225 Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
226 ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
227 metadata,
228 )),
229 Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
230 ListStream::new(
231 val.into_range_iter(span, Signals::empty()),
232 span,
233 engine_state.signals().clone(),
234 ),
235 metadata,
236 )),
237 Ok(other) => Err(PipelineData::value(other, metadata)),
238 Err(_) => Err(PipelineData::Value(
239 Value::Custom { val, internal_span },
240 metadata,
241 )),
242 }
243 }
244 _ => Err(self),
245 }
246 }
247
248 #[must_use]
253 pub fn into_stream_or_original(self, engine_state: &EngineState) -> PipelineData {
254 self.try_into_stream(engine_state)
255 .unwrap_or_else(|original| original)
256 }
257
258 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
262 match self {
263 PipelineData::Empty => Ok(()),
264 PipelineData::Value(value, ..) => {
265 let bytes = value_to_bytes(value)?;
266 dest.write_all(&bytes).map_err(|err| {
267 IoError::new_internal(err, "Could not write PipelineData to dest")
268 })?;
269 dest.flush().map_err(|err| {
270 IoError::new_internal(err, "Could not flush PipelineData to dest")
271 })?;
272 Ok(())
273 }
274 PipelineData::ListStream(stream, ..) => {
275 for value in stream {
276 let bytes = value_to_bytes(value)?;
277 dest.write_all(&bytes).map_err(|err| {
278 IoError::new_internal(err, "Could not write PipelineData to dest")
279 })?;
280 dest.write_all(b"\n").map_err(|err| {
281 IoError::new_internal(
282 err,
283 "Could not write linebreak after PipelineData to dest",
284 )
285 })?;
286 }
287 dest.flush().map_err(|err| {
288 IoError::new_internal(err, "Could not flush PipelineData to dest")
289 })?;
290 Ok(())
291 }
292 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
293 }
294 }
295
296 pub fn drain_to_out_dests(
303 mut self,
304 engine_state: &EngineState,
305 stack: &mut Stack,
306 ) -> Result<Self, ShellError> {
307 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
308 OutDest::Print => {
309 self.print_table(engine_state, stack, false, false)?;
310 Ok(Self::Empty)
311 }
312 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
313 OutDest::Value => {
314 let metadata = self.take_metadata();
315 let span = self.span().unwrap_or(Span::unknown());
316 self.into_value(span).map(|val| Self::Value(val, metadata))
317 }
318 OutDest::File(file) => {
319 self.write_to(file.as_ref())?;
320 Ok(Self::Empty)
321 }
322 OutDest::Null | OutDest::Inherit => {
323 self.drain()?;
324 Ok(Self::Empty)
325 }
326 }
327 }
328
329 pub fn drain(self) -> Result<(), ShellError> {
330 match self {
331 Self::Empty => Ok(()),
332 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
333 Self::Value(..) => Ok(()),
334 Self::ListStream(stream, ..) => stream.drain(),
335 Self::ByteStream(stream, ..) => stream.drain(),
336 }
337 }
338
339 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
345 Ok(PipelineIterator(match self {
346 PipelineData::Value(value, ..) => {
347 let val_span = value.span();
348 match value {
349 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
350 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
351 ),
352 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
353 ListStream::new(
354 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
355 val_span,
356 Signals::empty(),
357 )
358 .into_iter(),
359 ),
360 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
361 ListStream::new(
362 val.into_range_iter(val_span, Signals::empty()),
363 val_span,
364 Signals::empty(),
365 )
366 .into_iter(),
367 ),
368 #[expect(deprecated)]
370 Value::Custom { ref val, .. } if val.is_iterable() => {
371 match val.to_base_value(val_span) {
372 Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
373 ListStream::new(vals.into_iter(), val_span, Signals::empty())
374 .into_iter(),
375 ),
376 Ok(other) => {
377 return Err(ShellError::OnlySupportsThisInputType {
378 exp_input_type: "list, binary, range, or byte stream".into(),
379 wrong_type: other.get_type().to_string(),
380 dst_span: span,
381 src_span: val_span,
382 });
383 }
384 Err(err) => return Err(err),
385 }
386 }
387 Value::Error { error, .. } => return Err(*error),
389 other => {
390 return Err(ShellError::OnlySupportsThisInputType {
391 exp_input_type: "list, binary, range, or byte stream".into(),
392 wrong_type: other.get_type().to_string(),
393 dst_span: span,
394 src_span: val_span,
395 });
396 }
397 }
398 }
399 PipelineData::ListStream(stream, ..) => {
400 PipelineIteratorInner::ListStream(stream.into_iter())
401 }
402 PipelineData::Empty => {
403 return Err(ShellError::OnlySupportsThisInputType {
404 exp_input_type: "list, binary, range, or byte stream".into(),
405 wrong_type: "null".into(),
406 dst_span: span,
407 src_span: span,
408 });
409 }
410 PipelineData::ByteStream(stream, ..) => {
411 if let Some(chunks) = stream.chunks() {
412 PipelineIteratorInner::ByteStream(chunks)
413 } else {
414 PipelineIteratorInner::Empty
415 }
416 }
417 }))
418 }
419
420 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
421 match self {
422 PipelineData::Empty => Ok(String::new()),
423 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
424 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
425 PipelineData::ByteStream(stream, ..) => stream.into_string(),
426 }
427 }
428
429 pub fn collect_string_strict(
434 self,
435 span: Span,
436 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
437 match self {
438 PipelineData::Empty => Ok((String::new(), span, None)),
439 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
440 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
441 err_message: "string".into(),
442 span: val.span(),
443 }),
444 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
445 err_message: "string".into(),
446 span,
447 }),
448 PipelineData::ByteStream(stream, metadata) => {
449 let span = stream.span();
450 Ok((stream.into_string()?, span, metadata))
451 }
452 }
453 }
454
455 pub fn follow_cell_path(
456 self,
457 cell_path: &[PathMember],
458 head: Span,
459 ) -> Result<Value, ShellError> {
460 match self {
461 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
463 .follow_cell_path(cell_path)
464 .map(Cow::into_owned),
465 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
466 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
467 type_name: "empty pipeline".to_string(),
468 span: head,
469 }),
470 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
471 type_name: stream.type_().describe().to_owned(),
472 span: stream.span(),
473 }),
474 }
475 }
476
477 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
479 where
480 Self: Sized,
481 F: FnMut(Value) -> Value + 'static + Send,
482 {
483 match self {
484 PipelineData::Value(value, metadata) => {
485 let span = value.span();
486 let pipeline = match value {
487 Value::List { vals, .. } => vals
488 .into_iter()
489 .map(f)
490 .into_pipeline_data(span, signals.clone()),
491 Value::Range { val, .. } => val
492 .into_range_iter(span, Signals::empty())
493 .map(f)
494 .into_pipeline_data(span, signals.clone()),
495 #[expect(deprecated)]
496 Value::Custom { ref val, .. } if val.is_iterable() => {
497 match val.to_base_value(span)? {
498 Value::List { vals, .. } => vals
499 .into_iter()
500 .map(f)
501 .into_pipeline_data(span, signals.clone()),
502 Value::Range { val, .. } => val
503 .into_range_iter(span, Signals::empty())
504 .map(f)
505 .into_pipeline_data(span, signals.clone()),
506 value => match f(value) {
507 Value::Error { error, .. } => return Err(*error),
508 v => v.into_pipeline_data(),
509 },
510 }
511 }
512 value => match f(value) {
513 Value::Error { error, .. } => return Err(*error),
514 v => v.into_pipeline_data(),
515 },
516 };
517 Ok(pipeline.set_metadata(metadata))
518 }
519 PipelineData::Empty => Ok(PipelineData::empty()),
520 PipelineData::ListStream(stream, metadata) => {
521 Ok(PipelineData::list_stream(stream.map(f), metadata))
522 }
523 PipelineData::ByteStream(stream, metadata) => {
524 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
525 }
526 }
527 }
528
529 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
531 where
532 Self: Sized,
533 U: IntoIterator<Item = Value> + 'static,
534 <U as IntoIterator>::IntoIter: 'static + Send,
535 F: FnMut(Value) -> U + 'static + Send,
536 {
537 match self {
538 PipelineData::Empty => Ok(PipelineData::empty()),
539 PipelineData::Value(value, metadata) => {
540 let span = value.span();
541 let pipeline = match value {
542 Value::List { vals, .. } => vals
543 .into_iter()
544 .flat_map(f)
545 .into_pipeline_data(span, signals.clone()),
546 Value::Range { val, .. } => val
547 .into_range_iter(span, Signals::empty())
548 .flat_map(f)
549 .into_pipeline_data(span, signals.clone()),
550 #[expect(deprecated)]
551 Value::Custom { ref val, .. } if val.is_iterable() => {
552 match val.to_base_value(span)? {
553 Value::List { vals, .. } => vals
554 .into_iter()
555 .flat_map(f)
556 .into_pipeline_data(span, signals.clone()),
557 Value::Range { val, .. } => val
558 .into_range_iter(span, Signals::empty())
559 .flat_map(f)
560 .into_pipeline_data(span, signals.clone()),
561 value => f(value)
562 .into_iter()
563 .into_pipeline_data(span, signals.clone()),
564 }
565 }
566 value => f(value)
567 .into_iter()
568 .into_pipeline_data(span, signals.clone()),
569 };
570 Ok(pipeline.set_metadata(metadata))
571 }
572 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
573 stream.modify(|iter| iter.flat_map(f)),
574 metadata,
575 )),
576 PipelineData::ByteStream(stream, metadata) => {
577 let span = stream.span();
579 let iter = match String::from_utf8(stream.into_bytes()?) {
580 Ok(mut str) => {
581 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
582 f(Value::string(str, span))
583 }
584 Err(err) => f(Value::binary(err.into_bytes(), span)),
585 };
586 Ok(iter.into_iter().into_pipeline_data_with_metadata(
587 span,
588 signals.clone(),
589 metadata,
590 ))
591 }
592 }
593 }
594
595 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
596 where
597 Self: Sized,
598 F: FnMut(&Value) -> bool + 'static + Send,
599 {
600 match self {
601 PipelineData::Empty => Ok(PipelineData::empty()),
602 PipelineData::Value(value, metadata) => {
603 let span = value.span();
604 let pipeline = match value {
605 Value::List { vals, .. } => vals
606 .into_iter()
607 .filter(f)
608 .into_pipeline_data(span, signals.clone()),
609 Value::Range { val, .. } => val
610 .into_range_iter(span, Signals::empty())
611 .filter(f)
612 .into_pipeline_data(span, signals.clone()),
613 #[expect(deprecated)]
614 Value::Custom { ref val, .. } if val.is_iterable() => {
615 match val.to_base_value(span)? {
616 Value::List { vals, .. } => vals
617 .into_iter()
618 .filter(f)
619 .into_pipeline_data(span, signals.clone()),
620 Value::Range { val, .. } => val
621 .into_range_iter(span, Signals::empty())
622 .filter(f)
623 .into_pipeline_data(span, signals.clone()),
624 value => {
625 if f(&value) {
626 value.into_pipeline_data()
627 } else {
628 Value::nothing(span).into_pipeline_data()
629 }
630 }
631 }
632 }
633 value => {
634 if f(&value) {
635 value.into_pipeline_data()
636 } else {
637 Value::nothing(span).into_pipeline_data()
638 }
639 }
640 };
641 Ok(pipeline.set_metadata(metadata))
642 }
643 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
644 stream.modify(|iter| iter.filter(f)),
645 metadata,
646 )),
647 PipelineData::ByteStream(stream, metadata) => {
648 let span = stream.span();
650 let value = match String::from_utf8(stream.into_bytes()?) {
651 Ok(mut str) => {
652 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
653 Value::string(str, span)
654 }
655 Err(err) => Value::binary(err.into_bytes(), span),
656 };
657 let value = if f(&value) {
658 value
659 } else {
660 Value::nothing(span)
661 };
662 Ok(value.into_pipeline_data_with_metadata(metadata))
663 }
664 }
665 }
666
667 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
672 match self {
673 PipelineData::Value(v, metadata) => {
674 let span = v.span();
675 match v {
676 Value::Range { val, .. } => {
677 match *val {
678 Range::IntRange(range) => {
679 if range.is_unbounded() {
680 return Err(ShellError::Generic(
681 GenericError::new(
682 "Cannot create range",
683 "Unbounded ranges are not allowed when converting to this format",
684 span,
685 )
686 .with_help(
687 "Consider using ranges with valid start and end point.",
688 ),
689 ));
690 }
691 }
692 Range::FloatRange(range) => {
693 if range.is_unbounded() {
694 return Err(ShellError::Generic(
695 GenericError::new(
696 "Cannot create range",
697 "Unbounded ranges are not allowed when converting to this format",
698 span,
699 )
700 .with_help(
701 "Consider using ranges with valid start and end point.",
702 ),
703 ));
704 }
705 }
706 }
707 let range_values: Vec<Value> =
708 val.into_range_iter(span, Signals::empty()).collect();
709 Ok(PipelineData::value(Value::list(range_values, span), None))
710 }
711 x => Ok(PipelineData::value(x, metadata)),
712 }
713 }
714 _ => Ok(self),
715 }
716 }
717
718 pub fn print_table(
726 self,
727 engine_state: &EngineState,
728 stack: &mut Stack,
729 no_newline: bool,
730 to_stderr: bool,
731 ) -> Result<(), ShellError> {
732 match self {
733 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
735 stream.print(to_stderr)
736 }
737 _ => {
738 if let Some(decl_id) = engine_state.table_decl_id {
741 let command = engine_state.get_decl(decl_id);
742 if command.block_id().is_some() {
743 self.write_all_and_flush(engine_state, no_newline, to_stderr)
744 } else {
745 let call = Call::new(Span::new(0, 0));
746 let table = command.run(engine_state, stack, &(&call).into(), self)?;
747 table.write_all_and_flush(engine_state, no_newline, to_stderr)
748 }
749 } else {
750 self.write_all_and_flush(engine_state, no_newline, to_stderr)
751 }
752 }
753 }
754 }
755
756 pub fn print_raw(
764 self,
765 engine_state: &EngineState,
766 no_newline: bool,
767 to_stderr: bool,
768 ) -> Result<(), ShellError> {
769 let span = self.span();
770 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
771 if to_stderr {
772 write_all_and_flush(
773 bytes,
774 &mut std::io::stderr().lock(),
775 "stderr",
776 span,
777 engine_state.signals(),
778 )?;
779 } else {
780 write_all_and_flush(
781 bytes,
782 &mut std::io::stdout().lock(),
783 "stdout",
784 span,
785 engine_state.signals(),
786 )?;
787 }
788 Ok(())
789 } else {
790 self.write_all_and_flush(engine_state, no_newline, to_stderr)
791 }
792 }
793
794 fn write_all_and_flush(
795 self,
796 engine_state: &EngineState,
797 no_newline: bool,
798 to_stderr: bool,
799 ) -> Result<(), ShellError> {
800 let span = self.span();
801 if let PipelineData::ByteStream(stream, ..) = self {
802 stream.print(to_stderr)
804 } else {
805 let config = engine_state.get_config();
806 for item in self {
807 let mut out = if let Value::Error { error, .. } = item {
808 return Err(*error);
809 } else {
810 item.to_expanded_string("\n", config)
811 };
812
813 if !no_newline {
814 out.push('\n');
815 }
816
817 if to_stderr {
818 write_all_and_flush(
819 out,
820 &mut std::io::stderr().lock(),
821 "stderr",
822 span,
823 engine_state.signals(),
824 )?;
825 } else {
826 write_all_and_flush(
827 out,
828 &mut std::io::stdout().lock(),
829 "stdout",
830 span,
831 engine_state.signals(),
832 )?;
833 }
834 }
835
836 Ok(())
837 }
838 }
839
840 pub fn unsupported_input_error(
841 self,
842 expected_type: impl Into<String>,
843 span: Span,
844 ) -> ShellError {
845 match self {
846 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
847 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
848 exp_input_type: expected_type.into(),
849 wrong_type: value.get_type().get_non_specified_string(),
850 dst_span: span,
851 src_span: value.span(),
852 },
853 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
854 exp_input_type: expected_type.into(),
855 wrong_type: "list (stream)".into(),
856 dst_span: span,
857 src_span: stream.span(),
858 },
859 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
860 exp_input_type: expected_type.into(),
861 wrong_type: stream.type_().describe().into(),
862 dst_span: span,
863 src_span: stream.span(),
864 },
865 }
866 }
867
868 #[cfg(feature = "os")]
871 pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
872 match self {
873 PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
874 PipelineData::ByteStream(stream, ..) => match stream.source() {
875 ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
876 ByteStreamSource::Child(c) => {
877 let exit_future = c.clone_exit_status_future();
878 let ignore_error = c.clone_ignore_error();
879 Some(ExitStatusGuard::new(exit_future, ignore_error))
880 }
881 },
882 }
883 }
884}
885
886impl CompareTypes<Type> for PipelineData {
887 fn compare_types(&self, other: &Type) -> Option<TypeRelation> {
888 self.get_type().compare_types(other)
889 }
890
891 fn is_assignable_to(&self, dst: &Type) -> bool {
906 self.get_type().is_assignable_to(dst)
907 }
908}
909
910pub fn write_all_and_flush<T>(
911 data: T,
912 destination: &mut impl Write,
913 destination_name: &str,
914 span: Option<Span>,
915 signals: &Signals,
916) -> Result<(), ShellError>
917where
918 T: AsRef<[u8]>,
919{
920 let io_error_map = |err: std::io::Error, location: &Location<'_>| {
921 let context = format!("Writing to {destination_name} failed");
922 match span {
923 None => IoError::new_internal_with_location(err, context, location),
924 Some(span) if span == Span::unknown() => {
925 IoError::new_internal_with_location(err, context, location)
926 }
927 Some(span) => IoError::new_with_additional_context(err, span, None, context),
928 }
929 };
930
931 let span = span.unwrap_or(Span::unknown());
932 const OUTPUT_CHUNK_SIZE: usize = 8192;
933 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
934 signals.check(&span)?;
935 destination
936 .write_all(chunk)
937 .map_err(|err| io_error_map(err, Location::caller()))?;
938 }
939 destination
940 .flush()
941 .map_err(|err| io_error_map(err, Location::caller()))?;
942 Ok(())
943}
944
945enum PipelineIteratorInner {
946 Empty,
947 Value(Value),
948 ListStream(crate::list_stream::IntoIter),
949 ByteStream(crate::byte_stream::Chunks),
950}
951
952pub struct PipelineIterator(PipelineIteratorInner);
953
954impl IntoIterator for PipelineData {
955 type Item = Value;
956
957 type IntoIter = PipelineIterator;
958
959 fn into_iter(self) -> Self::IntoIter {
960 PipelineIterator(match self {
961 PipelineData::Empty => PipelineIteratorInner::Empty,
962 PipelineData::Value(value, ..) => {
963 let span = value.span();
964 match value {
965 Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
966 ListStream::new(
967 vals.into_iter(),
968 span,
969 signals.unwrap_or_else(Signals::empty),
970 )
971 .into_iter(),
972 ),
973 Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
974 ListStream::new(
975 val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
976 span,
977 Signals::empty(),
978 )
979 .into_iter(),
980 ),
981 #[expect(deprecated)]
983 Value::Custom { ref val, .. } if val.is_iterable() => {
984 match val.to_base_value(span) {
985 Ok(Value::List { vals, signals, .. }) => {
986 PipelineIteratorInner::ListStream(
987 ListStream::new(
988 vals.into_iter(),
989 span,
990 signals.unwrap_or_else(Signals::empty),
991 )
992 .into_iter(),
993 )
994 }
995 Ok(other) => PipelineIteratorInner::Value(other),
996 Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
997 }
998 }
999 x => PipelineIteratorInner::Value(x),
1000 }
1001 }
1002 PipelineData::ListStream(stream, ..) => {
1003 PipelineIteratorInner::ListStream(stream.into_iter())
1004 }
1005 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
1006 PipelineIteratorInner::Empty,
1007 PipelineIteratorInner::ByteStream,
1008 ),
1009 })
1010 }
1011}
1012
1013impl Iterator for PipelineIterator {
1014 type Item = Value;
1015
1016 fn next(&mut self) -> Option<Self::Item> {
1017 match &mut self.0 {
1018 PipelineIteratorInner::Empty => None,
1019 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
1020 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
1021 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
1022 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
1023 Ok(x) => x,
1024 Err(err) => Value::error(
1025 err,
1026 Span::unknown(), ),
1028 }),
1029 }
1030 }
1031}
1032
1033pub trait IntoPipelineData {
1034 fn into_pipeline_data(self) -> PipelineData;
1035
1036 fn into_pipeline_data_with_metadata(
1037 self,
1038 metadata: impl Into<Option<PipelineMetadata>>,
1039 ) -> PipelineData;
1040}
1041
1042impl<V> IntoPipelineData for V
1043where
1044 V: Into<Value>,
1045{
1046 fn into_pipeline_data(self) -> PipelineData {
1047 PipelineData::value(self.into(), None)
1048 }
1049
1050 fn into_pipeline_data_with_metadata(
1051 self,
1052 metadata: impl Into<Option<PipelineMetadata>>,
1053 ) -> PipelineData {
1054 PipelineData::value(self.into(), metadata.into())
1055 }
1056}
1057
1058pub trait IntoInterruptiblePipelineData {
1059 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1060 fn into_pipeline_data_with_metadata(
1061 self,
1062 span: Span,
1063 signals: Signals,
1064 metadata: impl Into<Option<PipelineMetadata>>,
1065 ) -> PipelineData;
1066}
1067
1068impl<I> IntoInterruptiblePipelineData for I
1069where
1070 I: IntoIterator + Send + 'static,
1071 I::IntoIter: Send + 'static,
1072 <I::IntoIter as Iterator>::Item: Into<Value>,
1073{
1074 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1075 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1076 }
1077
1078 fn into_pipeline_data_with_metadata(
1079 self,
1080 span: Span,
1081 signals: Signals,
1082 metadata: impl Into<Option<PipelineMetadata>>,
1083 ) -> PipelineData {
1084 PipelineData::list_stream(
1085 ListStream::new(self.into_iter().map(Into::into), span, signals),
1086 metadata.into(),
1087 )
1088 }
1089}
1090
1091fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1092 let bytes = match value {
1093 Value::String { val, .. } => val.into_bytes(),
1094 Value::Binary { val, .. } => val,
1095 Value::List { vals, .. } => {
1096 let val = vals
1097 .into_iter()
1098 .map(Value::coerce_into_string)
1099 .collect::<Result<Vec<String>, ShellError>>()?
1100 .join("\n")
1101 + "\n";
1102
1103 val.into_bytes()
1104 }
1105 Value::Error { error, .. } => return Err(*error),
1107 value => value.coerce_into_string()?.into_bytes(),
1108 };
1109 Ok(bytes)
1110}
1111
1112#[derive(Debug)]
1116pub struct PipelineExecutionData {
1117 pub body: PipelineData,
1118 #[cfg(feature = "os")]
1119 pub exit: Vec<Option<ExitStatusGuard>>,
1120}
1121
1122impl Deref for PipelineExecutionData {
1123 type Target = PipelineData;
1124
1125 fn deref(&self) -> &Self::Target {
1126 &self.body
1127 }
1128}
1129
1130impl DerefMut for PipelineExecutionData {
1131 fn deref_mut(&mut self) -> &mut Self::Target {
1132 &mut self.body
1133 }
1134}
1135
1136impl PipelineExecutionData {
1137 pub fn empty() -> Self {
1138 Self {
1139 body: PipelineData::empty(),
1140 #[cfg(feature = "os")]
1141 exit: vec![],
1142 }
1143 }
1144}
1145
1146impl From<PipelineData> for PipelineExecutionData {
1147 #[cfg(feature = "os")]
1148 fn from(value: PipelineData) -> Self {
1149 let value_span = value.span().unwrap_or_else(Span::unknown);
1150 let exit_status_future = value
1151 .clone_exit_status_future()
1152 .map(|f| f.with_span(value_span));
1153 Self {
1154 body: value,
1155 exit: vec![exit_status_future],
1156 }
1157 }
1158
1159 #[cfg(not(feature = "os"))]
1160 fn from(value: PipelineData) -> Self {
1161 Self { body: value }
1162 }
1163}