1#[cfg(feature = "os")]
2use crate::process::ExitStatusGuard;
3use crate::{
4 ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5 Range, ShellError, Signals, Span, Type, Value,
6 ast::{Call, PathMember},
7 engine::{EngineState, Stack},
8 location,
9 shell_error::{io::IoError, location::Location},
10};
11use std::{borrow::Cow, io::Write, ops::Deref};
12
13const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
14
15#[derive(Debug)]
45pub enum PipelineData {
46 Empty,
47 Value(Value, Option<PipelineMetadata>),
48 ListStream(ListStream, Option<PipelineMetadata>),
49 ByteStream(ByteStream, Option<PipelineMetadata>),
50}
51
52impl PipelineData {
53 pub const fn empty() -> PipelineData {
54 PipelineData::Empty
55 }
56
57 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
58 PipelineData::Value(val, metadata.into())
59 }
60
61 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
62 PipelineData::ListStream(stream, metadata.into())
63 }
64
65 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
66 PipelineData::ByteStream(stream, metadata.into())
67 }
68
69 pub fn metadata(&self) -> Option<PipelineMetadata> {
70 match self {
71 PipelineData::Empty => None,
72 PipelineData::Value(_, meta)
73 | PipelineData::ListStream(_, meta)
74 | PipelineData::ByteStream(_, meta) => meta.clone(),
75 }
76 }
77
78 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
79 match &mut self {
80 PipelineData::Empty => {}
81 PipelineData::Value(_, meta)
82 | PipelineData::ListStream(_, meta)
83 | PipelineData::ByteStream(_, meta) => *meta = metadata,
84 }
85 self
86 }
87
88 pub fn is_nothing(&self) -> bool {
89 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
90 || matches!(self, PipelineData::Empty)
91 }
92
93 pub fn span(&self) -> Option<Span> {
95 match self {
96 PipelineData::Empty => None,
97 PipelineData::Value(value, ..) => Some(value.span()),
98 PipelineData::ListStream(stream, ..) => Some(stream.span()),
99 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
100 }
101 }
102
103 pub fn with_span(self, span: Span) -> Self {
107 match self {
108 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
109 PipelineData::Value(value, metadata) => {
110 PipelineData::value(value.with_span(span), metadata)
111 }
112 PipelineData::ListStream(stream, metadata) => {
113 PipelineData::list_stream(stream.with_span(span), metadata)
114 }
115 PipelineData::ByteStream(stream, metadata) => {
116 PipelineData::byte_stream(stream.with_span(span), metadata)
117 }
118 }
119 }
120
121 pub fn get_type(&self) -> Type {
132 match self {
133 PipelineData::Empty => Type::Nothing,
134 PipelineData::Value(value, _) => value.get_type(),
135 PipelineData::ListStream(_, _) => Type::list(Type::Any),
136 PipelineData::ByteStream(stream, _) => stream.type_().into(),
137 }
138 }
139
140 pub fn is_subtype_of(&self, other: &Type) -> bool {
155 match (self, other) {
156 (_, Type::Any) => true,
157 (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
158 (PipelineData::Empty, Type::Nothing) => true,
159 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
160
161 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
163
164 (PipelineData::ByteStream(stream, ..), Type::String)
165 if stream.type_().is_string_coercible() =>
166 {
167 true
168 }
169 (PipelineData::ByteStream(stream, ..), Type::Binary)
170 if stream.type_().is_binary_coercible() =>
171 {
172 true
173 }
174
175 (PipelineData::Empty, _) => false,
176 (PipelineData::ListStream(..), _) => false,
177 (PipelineData::ByteStream(..), _) => false,
178 }
179 }
180
181 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
182 match self {
183 PipelineData::Empty => Ok(Value::nothing(span)),
184 PipelineData::Value(value, ..) => {
185 if value.span() == Span::unknown() {
186 Ok(value.with_span(span))
187 } else {
188 Ok(value)
189 }
190 }
191 PipelineData::ListStream(stream, ..) => stream.into_value(),
192 PipelineData::ByteStream(stream, ..) => stream.into_value(),
193 }
194 }
195
196 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
204 let span = self.span().unwrap_or(Span::unknown());
205 match self {
206 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
207 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
208 let metadata = metadata.clone();
209 Ok(PipelineData::list_stream(
210 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
211 metadata,
212 ))
213 }
214 PipelineData::Value(Value::String { val, .. }, metadata) => {
215 Ok(PipelineData::byte_stream(
216 ByteStream::read_string(val, span, engine_state.signals().clone()),
217 metadata,
218 ))
219 }
220 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
221 Ok(PipelineData::byte_stream(
222 ByteStream::read_binary(val, span, engine_state.signals().clone()),
223 metadata,
224 ))
225 }
226 PipelineData::Value(Value::Custom { val, internal_span }, metadata) => {
227 match val.to_base_value(internal_span) {
228 Ok(Value::List { vals, .. }) => Ok(PipelineData::list_stream(
229 ListStream::new(vals.into_iter(), span, engine_state.signals().clone()),
230 metadata,
231 )),
232 Ok(Value::Range { val, .. }) => Ok(PipelineData::list_stream(
233 ListStream::new(
234 val.into_range_iter(span, Signals::empty()),
235 span,
236 engine_state.signals().clone(),
237 ),
238 metadata,
239 )),
240 Ok(other) => Err(PipelineData::value(other, metadata)),
241 Err(_) => Err(PipelineData::Value(
242 Value::Custom { val, internal_span },
243 metadata,
244 )),
245 }
246 }
247 _ => Err(self),
248 }
249 }
250
251 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
255 match self {
256 PipelineData::Empty => Ok(()),
257 PipelineData::Value(value, ..) => {
258 let bytes = value_to_bytes(value)?;
259 dest.write_all(&bytes).map_err(|err| {
260 IoError::new_internal(
261 err,
262 "Could not write PipelineData to dest",
263 crate::location!(),
264 )
265 })?;
266 dest.flush().map_err(|err| {
267 IoError::new_internal(
268 err,
269 "Could not flush PipelineData to dest",
270 crate::location!(),
271 )
272 })?;
273 Ok(())
274 }
275 PipelineData::ListStream(stream, ..) => {
276 for value in stream {
277 let bytes = value_to_bytes(value)?;
278 dest.write_all(&bytes).map_err(|err| {
279 IoError::new_internal(
280 err,
281 "Could not write PipelineData to dest",
282 crate::location!(),
283 )
284 })?;
285 dest.write_all(b"\n").map_err(|err| {
286 IoError::new_internal(
287 err,
288 "Could not write linebreak after PipelineData to dest",
289 crate::location!(),
290 )
291 })?;
292 }
293 dest.flush().map_err(|err| {
294 IoError::new_internal(
295 err,
296 "Could not flush PipelineData to dest",
297 crate::location!(),
298 )
299 })?;
300 Ok(())
301 }
302 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
303 }
304 }
305
306 pub fn drain_to_out_dests(
313 self,
314 engine_state: &EngineState,
315 stack: &mut Stack,
316 ) -> Result<Self, ShellError> {
317 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
318 OutDest::Print => {
319 self.print_table(engine_state, stack, false, false)?;
320 Ok(Self::Empty)
321 }
322 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
323 OutDest::Value => {
324 let metadata = self.metadata();
325 let span = self.span().unwrap_or(Span::unknown());
326 self.into_value(span).map(|val| Self::Value(val, metadata))
327 }
328 OutDest::File(file) => {
329 self.write_to(file.as_ref())?;
330 Ok(Self::Empty)
331 }
332 OutDest::Null | OutDest::Inherit => {
333 self.drain()?;
334 Ok(Self::Empty)
335 }
336 }
337 }
338
339 pub fn drain(self) -> Result<(), ShellError> {
340 match self {
341 Self::Empty => Ok(()),
342 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
343 Self::Value(..) => Ok(()),
344 Self::ListStream(stream, ..) => stream.drain(),
345 Self::ByteStream(stream, ..) => stream.drain(),
346 }
347 }
348
349 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
355 Ok(PipelineIterator(match self {
356 PipelineData::Value(value, ..) => {
357 let val_span = value.span();
358 match value {
359 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
360 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
361 ),
362 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
363 ListStream::new(
364 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
365 val_span,
366 Signals::empty(),
367 )
368 .into_iter(),
369 ),
370 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
371 ListStream::new(
372 val.into_range_iter(val_span, Signals::empty()),
373 val_span,
374 Signals::empty(),
375 )
376 .into_iter(),
377 ),
378 Value::Custom { ref val, .. } if val.is_iterable() => {
380 match val.to_base_value(val_span) {
381 Ok(Value::List { vals, .. }) => PipelineIteratorInner::ListStream(
382 ListStream::new(vals.into_iter(), val_span, Signals::empty())
383 .into_iter(),
384 ),
385 Ok(other) => {
386 return Err(ShellError::OnlySupportsThisInputType {
387 exp_input_type: "list, binary, range, or byte stream".into(),
388 wrong_type: other.get_type().to_string(),
389 dst_span: span,
390 src_span: val_span,
391 });
392 }
393 Err(err) => return Err(err),
394 }
395 }
396 Value::Error { error, .. } => return Err(*error),
398 other => {
399 return Err(ShellError::OnlySupportsThisInputType {
400 exp_input_type: "list, binary, range, or byte stream".into(),
401 wrong_type: other.get_type().to_string(),
402 dst_span: span,
403 src_span: val_span,
404 });
405 }
406 }
407 }
408 PipelineData::ListStream(stream, ..) => {
409 PipelineIteratorInner::ListStream(stream.into_iter())
410 }
411 PipelineData::Empty => {
412 return Err(ShellError::OnlySupportsThisInputType {
413 exp_input_type: "list, binary, range, or byte stream".into(),
414 wrong_type: "null".into(),
415 dst_span: span,
416 src_span: span,
417 });
418 }
419 PipelineData::ByteStream(stream, ..) => {
420 if let Some(chunks) = stream.chunks() {
421 PipelineIteratorInner::ByteStream(chunks)
422 } else {
423 PipelineIteratorInner::Empty
424 }
425 }
426 }))
427 }
428
429 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
430 match self {
431 PipelineData::Empty => Ok(String::new()),
432 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
433 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
434 PipelineData::ByteStream(stream, ..) => stream.into_string(),
435 }
436 }
437
438 pub fn collect_string_strict(
443 self,
444 span: Span,
445 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
446 match self {
447 PipelineData::Empty => Ok((String::new(), span, None)),
448 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
449 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
450 err_message: "string".into(),
451 span: val.span(),
452 }),
453 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
454 err_message: "string".into(),
455 span,
456 }),
457 PipelineData::ByteStream(stream, metadata) => {
458 let span = stream.span();
459 Ok((stream.into_string()?, span, metadata))
460 }
461 }
462 }
463
464 pub fn follow_cell_path(
465 self,
466 cell_path: &[PathMember],
467 head: Span,
468 ) -> Result<Value, ShellError> {
469 match self {
470 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
472 .follow_cell_path(cell_path)
473 .map(Cow::into_owned),
474 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
475 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
476 type_name: "empty pipeline".to_string(),
477 span: head,
478 }),
479 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
480 type_name: stream.type_().describe().to_owned(),
481 span: stream.span(),
482 }),
483 }
484 }
485
486 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
488 where
489 Self: Sized,
490 F: FnMut(Value) -> Value + 'static + Send,
491 {
492 match self {
493 PipelineData::Value(value, metadata) => {
494 let span = value.span();
495 let pipeline = match value {
496 Value::List { vals, .. } => vals
497 .into_iter()
498 .map(f)
499 .into_pipeline_data(span, signals.clone()),
500 Value::Range { val, .. } => val
501 .into_range_iter(span, Signals::empty())
502 .map(f)
503 .into_pipeline_data(span, signals.clone()),
504 Value::Custom { ref val, .. } if val.is_iterable() => {
505 match val.to_base_value(span)? {
506 Value::List { vals, .. } => vals
507 .into_iter()
508 .map(f)
509 .into_pipeline_data(span, signals.clone()),
510 Value::Range { val, .. } => val
511 .into_range_iter(span, Signals::empty())
512 .map(f)
513 .into_pipeline_data(span, signals.clone()),
514 value => match f(value) {
515 Value::Error { error, .. } => return Err(*error),
516 v => v.into_pipeline_data(),
517 },
518 }
519 }
520 value => match f(value) {
521 Value::Error { error, .. } => return Err(*error),
522 v => v.into_pipeline_data(),
523 },
524 };
525 Ok(pipeline.set_metadata(metadata))
526 }
527 PipelineData::Empty => Ok(PipelineData::empty()),
528 PipelineData::ListStream(stream, metadata) => {
529 Ok(PipelineData::list_stream(stream.map(f), metadata))
530 }
531 PipelineData::ByteStream(stream, metadata) => {
532 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
533 }
534 }
535 }
536
537 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
539 where
540 Self: Sized,
541 U: IntoIterator<Item = Value> + 'static,
542 <U as IntoIterator>::IntoIter: 'static + Send,
543 F: FnMut(Value) -> U + 'static + Send,
544 {
545 match self {
546 PipelineData::Empty => Ok(PipelineData::empty()),
547 PipelineData::Value(value, metadata) => {
548 let span = value.span();
549 let pipeline = match value {
550 Value::List { vals, .. } => vals
551 .into_iter()
552 .flat_map(f)
553 .into_pipeline_data(span, signals.clone()),
554 Value::Range { val, .. } => val
555 .into_range_iter(span, Signals::empty())
556 .flat_map(f)
557 .into_pipeline_data(span, signals.clone()),
558 Value::Custom { ref val, .. } if val.is_iterable() => {
559 match val.to_base_value(span)? {
560 Value::List { vals, .. } => vals
561 .into_iter()
562 .flat_map(f)
563 .into_pipeline_data(span, signals.clone()),
564 Value::Range { val, .. } => val
565 .into_range_iter(span, Signals::empty())
566 .flat_map(f)
567 .into_pipeline_data(span, signals.clone()),
568 value => f(value)
569 .into_iter()
570 .into_pipeline_data(span, signals.clone()),
571 }
572 }
573 value => f(value)
574 .into_iter()
575 .into_pipeline_data(span, signals.clone()),
576 };
577 Ok(pipeline.set_metadata(metadata))
578 }
579 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
580 stream.modify(|iter| iter.flat_map(f)),
581 metadata,
582 )),
583 PipelineData::ByteStream(stream, metadata) => {
584 let span = stream.span();
586 let iter = match String::from_utf8(stream.into_bytes()?) {
587 Ok(mut str) => {
588 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
589 f(Value::string(str, span))
590 }
591 Err(err) => f(Value::binary(err.into_bytes(), span)),
592 };
593 Ok(iter.into_iter().into_pipeline_data_with_metadata(
594 span,
595 signals.clone(),
596 metadata,
597 ))
598 }
599 }
600 }
601
602 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
603 where
604 Self: Sized,
605 F: FnMut(&Value) -> bool + 'static + Send,
606 {
607 match self {
608 PipelineData::Empty => Ok(PipelineData::empty()),
609 PipelineData::Value(value, metadata) => {
610 let span = value.span();
611 let pipeline = match value {
612 Value::List { vals, .. } => vals
613 .into_iter()
614 .filter(f)
615 .into_pipeline_data(span, signals.clone()),
616 Value::Range { val, .. } => val
617 .into_range_iter(span, Signals::empty())
618 .filter(f)
619 .into_pipeline_data(span, signals.clone()),
620 Value::Custom { ref val, .. } if val.is_iterable() => {
621 match val.to_base_value(span)? {
622 Value::List { vals, .. } => vals
623 .into_iter()
624 .filter(f)
625 .into_pipeline_data(span, signals.clone()),
626 Value::Range { val, .. } => val
627 .into_range_iter(span, Signals::empty())
628 .filter(f)
629 .into_pipeline_data(span, signals.clone()),
630 value => {
631 if f(&value) {
632 value.into_pipeline_data()
633 } else {
634 Value::nothing(span).into_pipeline_data()
635 }
636 }
637 }
638 }
639 value => {
640 if f(&value) {
641 value.into_pipeline_data()
642 } else {
643 Value::nothing(span).into_pipeline_data()
644 }
645 }
646 };
647 Ok(pipeline.set_metadata(metadata))
648 }
649 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
650 stream.modify(|iter| iter.filter(f)),
651 metadata,
652 )),
653 PipelineData::ByteStream(stream, metadata) => {
654 let span = stream.span();
656 let value = match String::from_utf8(stream.into_bytes()?) {
657 Ok(mut str) => {
658 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
659 Value::string(str, span)
660 }
661 Err(err) => Value::binary(err.into_bytes(), span),
662 };
663 let value = if f(&value) {
664 value
665 } else {
666 Value::nothing(span)
667 };
668 Ok(value.into_pipeline_data_with_metadata(metadata))
669 }
670 }
671 }
672
673 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
678 match self {
679 PipelineData::Value(v, metadata) => {
680 let span = v.span();
681 match v {
682 Value::Range { val, .. } => {
683 match *val {
684 Range::IntRange(range) => {
685 if range.is_unbounded() {
686 return Err(ShellError::GenericError {
687 error: "Cannot create range".into(),
688 msg: "Unbounded ranges are not allowed when converting to this format".into(),
689 span: Some(span),
690 help: Some("Consider using ranges with valid start and end point.".into()),
691 inner: vec![],
692 });
693 }
694 }
695 Range::FloatRange(range) => {
696 if range.is_unbounded() {
697 return Err(ShellError::GenericError {
698 error: "Cannot create range".into(),
699 msg: "Unbounded ranges are not allowed when converting to this format".into(),
700 span: Some(span),
701 help: Some("Consider using ranges with valid start and end point.".into()),
702 inner: vec![],
703 });
704 }
705 }
706 }
707 let range_values: Vec<Value> =
708 val.into_range_iter(span, Signals::empty()).collect();
709 Ok(PipelineData::value(Value::list(range_values, span), None))
710 }
711 x => Ok(PipelineData::value(x, metadata)),
712 }
713 }
714 _ => Ok(self),
715 }
716 }
717
718 pub fn print_table(
726 self,
727 engine_state: &EngineState,
728 stack: &mut Stack,
729 no_newline: bool,
730 to_stderr: bool,
731 ) -> Result<(), ShellError> {
732 match self {
733 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
735 stream.print(to_stderr)
736 }
737 _ => {
738 if let Some(decl_id) = engine_state.table_decl_id {
741 let command = engine_state.get_decl(decl_id);
742 if command.block_id().is_some() {
743 self.write_all_and_flush(engine_state, no_newline, to_stderr)
744 } else {
745 let call = Call::new(Span::new(0, 0));
746 let table = command.run(engine_state, stack, &(&call).into(), self)?;
747 table.write_all_and_flush(engine_state, no_newline, to_stderr)
748 }
749 } else {
750 self.write_all_and_flush(engine_state, no_newline, to_stderr)
751 }
752 }
753 }
754 }
755
756 pub fn print_raw(
764 self,
765 engine_state: &EngineState,
766 no_newline: bool,
767 to_stderr: bool,
768 ) -> Result<(), ShellError> {
769 let span = self.span();
770 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
771 if to_stderr {
772 write_all_and_flush(
773 bytes,
774 &mut std::io::stderr().lock(),
775 "stderr",
776 span,
777 engine_state.signals(),
778 )?;
779 } else {
780 write_all_and_flush(
781 bytes,
782 &mut std::io::stdout().lock(),
783 "stdout",
784 span,
785 engine_state.signals(),
786 )?;
787 }
788 Ok(())
789 } else {
790 self.write_all_and_flush(engine_state, no_newline, to_stderr)
791 }
792 }
793
794 fn write_all_and_flush(
795 self,
796 engine_state: &EngineState,
797 no_newline: bool,
798 to_stderr: bool,
799 ) -> Result<(), ShellError> {
800 let span = self.span();
801 if let PipelineData::ByteStream(stream, ..) = self {
802 stream.print(to_stderr)
804 } else {
805 let config = engine_state.get_config();
806 for item in self {
807 let mut out = if let Value::Error { error, .. } = item {
808 return Err(*error);
809 } else {
810 item.to_expanded_string("\n", config)
811 };
812
813 if !no_newline {
814 out.push('\n');
815 }
816
817 if to_stderr {
818 write_all_and_flush(
819 out,
820 &mut std::io::stderr().lock(),
821 "stderr",
822 span,
823 engine_state.signals(),
824 )?;
825 } else {
826 write_all_and_flush(
827 out,
828 &mut std::io::stdout().lock(),
829 "stdout",
830 span,
831 engine_state.signals(),
832 )?;
833 }
834 }
835
836 Ok(())
837 }
838 }
839
840 pub fn unsupported_input_error(
841 self,
842 expected_type: impl Into<String>,
843 span: Span,
844 ) -> ShellError {
845 match self {
846 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
847 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
848 exp_input_type: expected_type.into(),
849 wrong_type: value.get_type().get_non_specified_string(),
850 dst_span: span,
851 src_span: value.span(),
852 },
853 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
854 exp_input_type: expected_type.into(),
855 wrong_type: "list (stream)".into(),
856 dst_span: span,
857 src_span: stream.span(),
858 },
859 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
860 exp_input_type: expected_type.into(),
861 wrong_type: stream.type_().describe().into(),
862 dst_span: span,
863 src_span: stream.span(),
864 },
865 }
866 }
867
868 #[cfg(feature = "os")]
871 pub fn clone_exit_status_future(&self) -> Option<ExitStatusGuard> {
872 match self {
873 PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
874 PipelineData::ByteStream(stream, ..) => match stream.source() {
875 ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
876 ByteStreamSource::Child(c) => {
877 let exit_future = c.clone_exit_status_future();
878 let ignore_error = c.clone_ignore_error();
879 Some(ExitStatusGuard::new(exit_future, ignore_error))
880 }
881 },
882 }
883 }
884}
885
886pub fn write_all_and_flush<T>(
887 data: T,
888 destination: &mut impl Write,
889 destination_name: &str,
890 span: Option<Span>,
891 signals: &Signals,
892) -> Result<(), ShellError>
893where
894 T: AsRef<[u8]>,
895{
896 let io_error_map = |err: std::io::Error, location: Location| {
897 let context = format!("Writing to {destination_name} failed");
898 match span {
899 None => IoError::new_internal(err, context, location),
900 Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
901 Some(span) => IoError::new_with_additional_context(err, span, None, context),
902 }
903 };
904
905 let span = span.unwrap_or(Span::unknown());
906 const OUTPUT_CHUNK_SIZE: usize = 8192;
907 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
908 signals.check(&span)?;
909 destination
910 .write_all(chunk)
911 .map_err(|err| io_error_map(err, location!()))?;
912 }
913 destination
914 .flush()
915 .map_err(|err| io_error_map(err, location!()))?;
916 Ok(())
917}
918
919enum PipelineIteratorInner {
920 Empty,
921 Value(Value),
922 ListStream(crate::list_stream::IntoIter),
923 ByteStream(crate::byte_stream::Chunks),
924}
925
926pub struct PipelineIterator(PipelineIteratorInner);
927
928impl IntoIterator for PipelineData {
929 type Item = Value;
930
931 type IntoIter = PipelineIterator;
932
933 fn into_iter(self) -> Self::IntoIter {
934 PipelineIterator(match self {
935 PipelineData::Empty => PipelineIteratorInner::Empty,
936 PipelineData::Value(value, ..) => {
937 let span = value.span();
938 match value {
939 Value::List { vals, signals, .. } => PipelineIteratorInner::ListStream(
940 ListStream::new(
941 vals.into_iter(),
942 span,
943 signals.unwrap_or_else(Signals::empty),
944 )
945 .into_iter(),
946 ),
947 Value::Range { val, signals, .. } => PipelineIteratorInner::ListStream(
948 ListStream::new(
949 val.into_range_iter(span, signals.unwrap_or_else(Signals::empty)),
950 span,
951 Signals::empty(),
952 )
953 .into_iter(),
954 ),
955 Value::Custom { ref val, .. } if val.is_iterable() => {
957 match val.to_base_value(span) {
958 Ok(Value::List { vals, signals, .. }) => {
959 PipelineIteratorInner::ListStream(
960 ListStream::new(
961 vals.into_iter(),
962 span,
963 signals.unwrap_or_else(Signals::empty),
964 )
965 .into_iter(),
966 )
967 }
968 Ok(other) => PipelineIteratorInner::Value(other),
969 Err(err) => PipelineIteratorInner::Value(Value::error(err, span)),
970 }
971 }
972 x => PipelineIteratorInner::Value(x),
973 }
974 }
975 PipelineData::ListStream(stream, ..) => {
976 PipelineIteratorInner::ListStream(stream.into_iter())
977 }
978 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
979 PipelineIteratorInner::Empty,
980 PipelineIteratorInner::ByteStream,
981 ),
982 })
983 }
984}
985
986impl Iterator for PipelineIterator {
987 type Item = Value;
988
989 fn next(&mut self) -> Option<Self::Item> {
990 match &mut self.0 {
991 PipelineIteratorInner::Empty => None,
992 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
993 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
994 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
995 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
996 Ok(x) => x,
997 Err(err) => Value::error(
998 err,
999 Span::unknown(), ),
1001 }),
1002 }
1003 }
1004}
1005
1006pub trait IntoPipelineData {
1007 fn into_pipeline_data(self) -> PipelineData;
1008
1009 fn into_pipeline_data_with_metadata(
1010 self,
1011 metadata: impl Into<Option<PipelineMetadata>>,
1012 ) -> PipelineData;
1013}
1014
1015impl<V> IntoPipelineData for V
1016where
1017 V: Into<Value>,
1018{
1019 fn into_pipeline_data(self) -> PipelineData {
1020 PipelineData::value(self.into(), None)
1021 }
1022
1023 fn into_pipeline_data_with_metadata(
1024 self,
1025 metadata: impl Into<Option<PipelineMetadata>>,
1026 ) -> PipelineData {
1027 PipelineData::value(self.into(), metadata.into())
1028 }
1029}
1030
1031pub trait IntoInterruptiblePipelineData {
1032 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
1033 fn into_pipeline_data_with_metadata(
1034 self,
1035 span: Span,
1036 signals: Signals,
1037 metadata: impl Into<Option<PipelineMetadata>>,
1038 ) -> PipelineData;
1039}
1040
1041impl<I> IntoInterruptiblePipelineData for I
1042where
1043 I: IntoIterator + Send + 'static,
1044 I::IntoIter: Send + 'static,
1045 <I::IntoIter as Iterator>::Item: Into<Value>,
1046{
1047 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
1048 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
1049 }
1050
1051 fn into_pipeline_data_with_metadata(
1052 self,
1053 span: Span,
1054 signals: Signals,
1055 metadata: impl Into<Option<PipelineMetadata>>,
1056 ) -> PipelineData {
1057 PipelineData::list_stream(
1058 ListStream::new(self.into_iter().map(Into::into), span, signals),
1059 metadata.into(),
1060 )
1061 }
1062}
1063
1064fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
1065 let bytes = match value {
1066 Value::String { val, .. } => val.into_bytes(),
1067 Value::Binary { val, .. } => val,
1068 Value::List { vals, .. } => {
1069 let val = vals
1070 .into_iter()
1071 .map(Value::coerce_into_string)
1072 .collect::<Result<Vec<String>, ShellError>>()?
1073 .join("\n")
1074 + "\n";
1075
1076 val.into_bytes()
1077 }
1078 Value::Error { error, .. } => return Err(*error),
1080 value => value.coerce_into_string()?.into_bytes(),
1081 };
1082 Ok(bytes)
1083}
1084
1085pub struct PipelineExecutionData {
1089 pub body: PipelineData,
1090 #[cfg(feature = "os")]
1091 pub exit: Vec<Option<ExitStatusGuard>>,
1092}
1093
1094impl Deref for PipelineExecutionData {
1095 type Target = PipelineData;
1096
1097 fn deref(&self) -> &Self::Target {
1098 &self.body
1099 }
1100}
1101
1102impl PipelineExecutionData {
1103 pub fn empty() -> Self {
1104 Self {
1105 body: PipelineData::empty(),
1106 #[cfg(feature = "os")]
1107 exit: vec![],
1108 }
1109 }
1110}
1111
1112impl From<PipelineData> for PipelineExecutionData {
1113 #[cfg(feature = "os")]
1114 fn from(value: PipelineData) -> Self {
1115 let value_span = value.span().unwrap_or_else(Span::unknown);
1116 let exit_status_future = value
1117 .clone_exit_status_future()
1118 .map(|f| f.with_span(value_span));
1119 Self {
1120 body: value,
1121 exit: vec![exit_status_future],
1122 }
1123 }
1124
1125 #[cfg(not(feature = "os"))]
1126 fn from(value: PipelineData) -> Self {
1127 Self { body: value }
1128 }
1129}