1#[cfg(feature = "os")]
2use crate::process::ExitStatusFuture;
3use crate::{
4 ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5 Range, ShellError, Signals, Span, Type, Value,
6 ast::{Call, PathMember},
7 engine::{EngineState, Stack},
8 location,
9 shell_error::{io::IoError, location::Location},
10};
11use std::{
12 borrow::Cow,
13 io::Write,
14 ops::Deref,
15 sync::{Arc, Mutex},
16};
17
18const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
19
20#[derive(Debug)]
50pub enum PipelineData {
51 Empty,
52 Value(Value, Option<PipelineMetadata>),
53 ListStream(ListStream, Option<PipelineMetadata>),
54 ByteStream(ByteStream, Option<PipelineMetadata>),
55}
56
57impl PipelineData {
58 pub const fn empty() -> PipelineData {
59 PipelineData::Empty
60 }
61
62 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
63 PipelineData::Value(val, metadata.into())
64 }
65
66 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
67 PipelineData::ListStream(stream, metadata.into())
68 }
69
70 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
71 PipelineData::ByteStream(stream, metadata.into())
72 }
73
74 pub fn metadata(&self) -> Option<PipelineMetadata> {
75 match self {
76 PipelineData::Empty => None,
77 PipelineData::Value(_, meta)
78 | PipelineData::ListStream(_, meta)
79 | PipelineData::ByteStream(_, meta) => meta.clone(),
80 }
81 }
82
83 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
84 match &mut self {
85 PipelineData::Empty => {}
86 PipelineData::Value(_, meta)
87 | PipelineData::ListStream(_, meta)
88 | PipelineData::ByteStream(_, meta) => *meta = metadata,
89 }
90 self
91 }
92
93 pub fn is_nothing(&self) -> bool {
94 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
95 || matches!(self, PipelineData::Empty)
96 }
97
98 pub fn span(&self) -> Option<Span> {
100 match self {
101 PipelineData::Empty => None,
102 PipelineData::Value(value, ..) => Some(value.span()),
103 PipelineData::ListStream(stream, ..) => Some(stream.span()),
104 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
105 }
106 }
107
108 pub fn with_span(self, span: Span) -> Self {
112 match self {
113 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
114 PipelineData::Value(value, metadata) => {
115 PipelineData::value(value.with_span(span), metadata)
116 }
117 PipelineData::ListStream(stream, metadata) => {
118 PipelineData::list_stream(stream.with_span(span), metadata)
119 }
120 PipelineData::ByteStream(stream, metadata) => {
121 PipelineData::byte_stream(stream.with_span(span), metadata)
122 }
123 }
124 }
125
126 pub fn get_type(&self) -> Type {
137 match self {
138 PipelineData::Empty => Type::Nothing,
139 PipelineData::Value(value, _) => value.get_type(),
140 PipelineData::ListStream(_, _) => Type::list(Type::Any),
141 PipelineData::ByteStream(stream, _) => stream.type_().into(),
142 }
143 }
144
145 pub fn is_subtype_of(&self, other: &Type) -> bool {
160 match (self, other) {
161 (_, Type::Any) => true,
162 (data, Type::OneOf(oneof)) => oneof.iter().any(|t| data.is_subtype_of(t)),
163 (PipelineData::Empty, Type::Nothing) => true,
164 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
165
166 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
168
169 (PipelineData::ByteStream(stream, ..), Type::String)
170 if stream.type_().is_string_coercible() =>
171 {
172 true
173 }
174 (PipelineData::ByteStream(stream, ..), Type::Binary)
175 if stream.type_().is_binary_coercible() =>
176 {
177 true
178 }
179
180 (PipelineData::Empty, _) => false,
181 (PipelineData::ListStream(..), _) => false,
182 (PipelineData::ByteStream(..), _) => false,
183 }
184 }
185
186 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
187 match self {
188 PipelineData::Empty => Ok(Value::nothing(span)),
189 PipelineData::Value(value, ..) => {
190 if value.span() == Span::unknown() {
191 Ok(value.with_span(span))
192 } else {
193 Ok(value)
194 }
195 }
196 PipelineData::ListStream(stream, ..) => stream.into_value(),
197 PipelineData::ByteStream(stream, ..) => stream.into_value(),
198 }
199 }
200
201 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
209 let span = self.span().unwrap_or(Span::unknown());
210 match self {
211 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
212 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
213 let metadata = metadata.clone();
214 Ok(PipelineData::list_stream(
215 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
216 metadata,
217 ))
218 }
219 PipelineData::Value(Value::String { val, .. }, metadata) => {
220 Ok(PipelineData::byte_stream(
221 ByteStream::read_string(val, span, engine_state.signals().clone()),
222 metadata,
223 ))
224 }
225 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
226 Ok(PipelineData::byte_stream(
227 ByteStream::read_binary(val, span, engine_state.signals().clone()),
228 metadata,
229 ))
230 }
231 _ => Err(self),
232 }
233 }
234
235 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
239 match self {
240 PipelineData::Empty => Ok(()),
241 PipelineData::Value(value, ..) => {
242 let bytes = value_to_bytes(value)?;
243 dest.write_all(&bytes).map_err(|err| {
244 IoError::new_internal(
245 err,
246 "Could not write PipelineData to dest",
247 crate::location!(),
248 )
249 })?;
250 dest.flush().map_err(|err| {
251 IoError::new_internal(
252 err,
253 "Could not flush PipelineData to dest",
254 crate::location!(),
255 )
256 })?;
257 Ok(())
258 }
259 PipelineData::ListStream(stream, ..) => {
260 for value in stream {
261 let bytes = value_to_bytes(value)?;
262 dest.write_all(&bytes).map_err(|err| {
263 IoError::new_internal(
264 err,
265 "Could not write PipelineData to dest",
266 crate::location!(),
267 )
268 })?;
269 dest.write_all(b"\n").map_err(|err| {
270 IoError::new_internal(
271 err,
272 "Could not write linebreak after PipelineData to dest",
273 crate::location!(),
274 )
275 })?;
276 }
277 dest.flush().map_err(|err| {
278 IoError::new_internal(
279 err,
280 "Could not flush PipelineData to dest",
281 crate::location!(),
282 )
283 })?;
284 Ok(())
285 }
286 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
287 }
288 }
289
290 pub fn drain_to_out_dests(
297 self,
298 engine_state: &EngineState,
299 stack: &mut Stack,
300 ) -> Result<Self, ShellError> {
301 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
302 OutDest::Print => {
303 self.print_table(engine_state, stack, false, false)?;
304 Ok(Self::Empty)
305 }
306 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
307 OutDest::Value => {
308 let metadata = self.metadata();
309 let span = self.span().unwrap_or(Span::unknown());
310 self.into_value(span).map(|val| Self::Value(val, metadata))
311 }
312 OutDest::File(file) => {
313 self.write_to(file.as_ref())?;
314 Ok(Self::Empty)
315 }
316 OutDest::Null | OutDest::Inherit => {
317 self.drain()?;
318 Ok(Self::Empty)
319 }
320 }
321 }
322
323 pub fn drain(self) -> Result<(), ShellError> {
324 match self {
325 Self::Empty => Ok(()),
326 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
327 Self::Value(..) => Ok(()),
328 Self::ListStream(stream, ..) => stream.drain(),
329 Self::ByteStream(stream, ..) => stream.drain(),
330 }
331 }
332
333 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
339 Ok(PipelineIterator(match self {
340 PipelineData::Value(value, ..) => {
341 let val_span = value.span();
342 match value {
343 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
344 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
345 ),
346 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
347 ListStream::new(
348 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
349 val_span,
350 Signals::empty(),
351 )
352 .into_iter(),
353 ),
354 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
355 ListStream::new(
356 val.into_range_iter(val_span, Signals::empty()),
357 val_span,
358 Signals::empty(),
359 )
360 .into_iter(),
361 ),
362 Value::Error { error, .. } => return Err(*error),
364 other => {
365 return Err(ShellError::OnlySupportsThisInputType {
366 exp_input_type: "list, binary, range, or byte stream".into(),
367 wrong_type: other.get_type().to_string(),
368 dst_span: span,
369 src_span: val_span,
370 });
371 }
372 }
373 }
374 PipelineData::ListStream(stream, ..) => {
375 PipelineIteratorInner::ListStream(stream.into_iter())
376 }
377 PipelineData::Empty => {
378 return Err(ShellError::OnlySupportsThisInputType {
379 exp_input_type: "list, binary, range, or byte stream".into(),
380 wrong_type: "null".into(),
381 dst_span: span,
382 src_span: span,
383 });
384 }
385 PipelineData::ByteStream(stream, ..) => {
386 if let Some(chunks) = stream.chunks() {
387 PipelineIteratorInner::ByteStream(chunks)
388 } else {
389 PipelineIteratorInner::Empty
390 }
391 }
392 }))
393 }
394
395 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
396 match self {
397 PipelineData::Empty => Ok(String::new()),
398 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
399 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
400 PipelineData::ByteStream(stream, ..) => stream.into_string(),
401 }
402 }
403
404 pub fn collect_string_strict(
409 self,
410 span: Span,
411 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
412 match self {
413 PipelineData::Empty => Ok((String::new(), span, None)),
414 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
415 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
416 err_message: "string".into(),
417 span: val.span(),
418 }),
419 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
420 err_message: "string".into(),
421 span,
422 }),
423 PipelineData::ByteStream(stream, metadata) => {
424 let span = stream.span();
425 Ok((stream.into_string()?, span, metadata))
426 }
427 }
428 }
429
430 pub fn follow_cell_path(
431 self,
432 cell_path: &[PathMember],
433 head: Span,
434 ) -> Result<Value, ShellError> {
435 match self {
436 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
438 .follow_cell_path(cell_path)
439 .map(Cow::into_owned),
440 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
441 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
442 type_name: "empty pipeline".to_string(),
443 span: head,
444 }),
445 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
446 type_name: stream.type_().describe().to_owned(),
447 span: stream.span(),
448 }),
449 }
450 }
451
452 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
454 where
455 Self: Sized,
456 F: FnMut(Value) -> Value + 'static + Send,
457 {
458 match self {
459 PipelineData::Value(value, metadata) => {
460 let span = value.span();
461 let pipeline = match value {
462 Value::List { vals, .. } => vals
463 .into_iter()
464 .map(f)
465 .into_pipeline_data(span, signals.clone()),
466 Value::Range { val, .. } => val
467 .into_range_iter(span, Signals::empty())
468 .map(f)
469 .into_pipeline_data(span, signals.clone()),
470 value => match f(value) {
471 Value::Error { error, .. } => return Err(*error),
472 v => v.into_pipeline_data(),
473 },
474 };
475 Ok(pipeline.set_metadata(metadata))
476 }
477 PipelineData::Empty => Ok(PipelineData::empty()),
478 PipelineData::ListStream(stream, metadata) => {
479 Ok(PipelineData::list_stream(stream.map(f), metadata))
480 }
481 PipelineData::ByteStream(stream, metadata) => {
482 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
483 }
484 }
485 }
486
487 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
489 where
490 Self: Sized,
491 U: IntoIterator<Item = Value> + 'static,
492 <U as IntoIterator>::IntoIter: 'static + Send,
493 F: FnMut(Value) -> U + 'static + Send,
494 {
495 match self {
496 PipelineData::Empty => Ok(PipelineData::empty()),
497 PipelineData::Value(value, metadata) => {
498 let span = value.span();
499 let pipeline = match value {
500 Value::List { vals, .. } => vals
501 .into_iter()
502 .flat_map(f)
503 .into_pipeline_data(span, signals.clone()),
504 Value::Range { val, .. } => val
505 .into_range_iter(span, Signals::empty())
506 .flat_map(f)
507 .into_pipeline_data(span, signals.clone()),
508 value => f(value)
509 .into_iter()
510 .into_pipeline_data(span, signals.clone()),
511 };
512 Ok(pipeline.set_metadata(metadata))
513 }
514 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
515 stream.modify(|iter| iter.flat_map(f)),
516 metadata,
517 )),
518 PipelineData::ByteStream(stream, metadata) => {
519 let span = stream.span();
521 let iter = match String::from_utf8(stream.into_bytes()?) {
522 Ok(mut str) => {
523 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
524 f(Value::string(str, span))
525 }
526 Err(err) => f(Value::binary(err.into_bytes(), span)),
527 };
528 Ok(iter.into_iter().into_pipeline_data_with_metadata(
529 span,
530 signals.clone(),
531 metadata,
532 ))
533 }
534 }
535 }
536
537 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
538 where
539 Self: Sized,
540 F: FnMut(&Value) -> bool + 'static + Send,
541 {
542 match self {
543 PipelineData::Empty => Ok(PipelineData::empty()),
544 PipelineData::Value(value, metadata) => {
545 let span = value.span();
546 let pipeline = match value {
547 Value::List { vals, .. } => vals
548 .into_iter()
549 .filter(f)
550 .into_pipeline_data(span, signals.clone()),
551 Value::Range { val, .. } => val
552 .into_range_iter(span, Signals::empty())
553 .filter(f)
554 .into_pipeline_data(span, signals.clone()),
555 value => {
556 if f(&value) {
557 value.into_pipeline_data()
558 } else {
559 Value::nothing(span).into_pipeline_data()
560 }
561 }
562 };
563 Ok(pipeline.set_metadata(metadata))
564 }
565 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
566 stream.modify(|iter| iter.filter(f)),
567 metadata,
568 )),
569 PipelineData::ByteStream(stream, metadata) => {
570 let span = stream.span();
572 let value = match String::from_utf8(stream.into_bytes()?) {
573 Ok(mut str) => {
574 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
575 Value::string(str, span)
576 }
577 Err(err) => Value::binary(err.into_bytes(), span),
578 };
579 let value = if f(&value) {
580 value
581 } else {
582 Value::nothing(span)
583 };
584 Ok(value.into_pipeline_data_with_metadata(metadata))
585 }
586 }
587 }
588
589 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
594 match self {
595 PipelineData::Value(v, metadata) => {
596 let span = v.span();
597 match v {
598 Value::Range { val, .. } => {
599 match *val {
600 Range::IntRange(range) => {
601 if range.is_unbounded() {
602 return Err(ShellError::GenericError {
603 error: "Cannot create range".into(),
604 msg: "Unbounded ranges are not allowed when converting to this format".into(),
605 span: Some(span),
606 help: Some("Consider using ranges with valid start and end point.".into()),
607 inner: vec![],
608 });
609 }
610 }
611 Range::FloatRange(range) => {
612 if range.is_unbounded() {
613 return Err(ShellError::GenericError {
614 error: "Cannot create range".into(),
615 msg: "Unbounded ranges are not allowed when converting to this format".into(),
616 span: Some(span),
617 help: Some("Consider using ranges with valid start and end point.".into()),
618 inner: vec![],
619 });
620 }
621 }
622 }
623 let range_values: Vec<Value> =
624 val.into_range_iter(span, Signals::empty()).collect();
625 Ok(PipelineData::value(Value::list(range_values, span), None))
626 }
627 x => Ok(PipelineData::value(x, metadata)),
628 }
629 }
630 _ => Ok(self),
631 }
632 }
633
634 pub fn print_table(
642 self,
643 engine_state: &EngineState,
644 stack: &mut Stack,
645 no_newline: bool,
646 to_stderr: bool,
647 ) -> Result<(), ShellError> {
648 match self {
649 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
651 stream.print(to_stderr)
652 }
653 _ => {
654 if let Some(decl_id) = engine_state.table_decl_id {
657 let command = engine_state.get_decl(decl_id);
658 if command.block_id().is_some() {
659 self.write_all_and_flush(engine_state, no_newline, to_stderr)
660 } else {
661 let call = Call::new(Span::new(0, 0));
662 let table = command.run(engine_state, stack, &(&call).into(), self)?;
663 table.write_all_and_flush(engine_state, no_newline, to_stderr)
664 }
665 } else {
666 self.write_all_and_flush(engine_state, no_newline, to_stderr)
667 }
668 }
669 }
670 }
671
672 pub fn print_raw(
680 self,
681 engine_state: &EngineState,
682 no_newline: bool,
683 to_stderr: bool,
684 ) -> Result<(), ShellError> {
685 let span = self.span();
686 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
687 if to_stderr {
688 write_all_and_flush(
689 bytes,
690 &mut std::io::stderr().lock(),
691 "stderr",
692 span,
693 engine_state.signals(),
694 )?;
695 } else {
696 write_all_and_flush(
697 bytes,
698 &mut std::io::stdout().lock(),
699 "stdout",
700 span,
701 engine_state.signals(),
702 )?;
703 }
704 Ok(())
705 } else {
706 self.write_all_and_flush(engine_state, no_newline, to_stderr)
707 }
708 }
709
710 fn write_all_and_flush(
711 self,
712 engine_state: &EngineState,
713 no_newline: bool,
714 to_stderr: bool,
715 ) -> Result<(), ShellError> {
716 let span = self.span();
717 if let PipelineData::ByteStream(stream, ..) = self {
718 stream.print(to_stderr)
720 } else {
721 let config = engine_state.get_config();
722 for item in self {
723 let mut out = if let Value::Error { error, .. } = item {
724 return Err(*error);
725 } else {
726 item.to_expanded_string("\n", config)
727 };
728
729 if !no_newline {
730 out.push('\n');
731 }
732
733 if to_stderr {
734 write_all_and_flush(
735 out,
736 &mut std::io::stderr().lock(),
737 "stderr",
738 span,
739 engine_state.signals(),
740 )?;
741 } else {
742 write_all_and_flush(
743 out,
744 &mut std::io::stdout().lock(),
745 "stdout",
746 span,
747 engine_state.signals(),
748 )?;
749 }
750 }
751
752 Ok(())
753 }
754 }
755
756 pub fn unsupported_input_error(
757 self,
758 expected_type: impl Into<String>,
759 span: Span,
760 ) -> ShellError {
761 match self {
762 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
763 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
764 exp_input_type: expected_type.into(),
765 wrong_type: value.get_type().get_non_specified_string(),
766 dst_span: span,
767 src_span: value.span(),
768 },
769 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
770 exp_input_type: expected_type.into(),
771 wrong_type: "list (stream)".into(),
772 dst_span: span,
773 src_span: stream.span(),
774 },
775 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
776 exp_input_type: expected_type.into(),
777 wrong_type: stream.type_().describe().into(),
778 dst_span: span,
779 src_span: stream.span(),
780 },
781 }
782 }
783
784 #[cfg(feature = "os")]
787 pub fn clone_exit_status_future(&self) -> Option<Arc<Mutex<ExitStatusFuture>>> {
788 match self {
789 PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
790 PipelineData::ByteStream(stream, ..) => match stream.source() {
791 ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
792 ByteStreamSource::Child(c) => Some(c.clone_exit_status_future()),
793 },
794 }
795 }
796}
797
798pub fn write_all_and_flush<T>(
799 data: T,
800 destination: &mut impl Write,
801 destination_name: &str,
802 span: Option<Span>,
803 signals: &Signals,
804) -> Result<(), ShellError>
805where
806 T: AsRef<[u8]>,
807{
808 let io_error_map = |err: std::io::Error, location: Location| {
809 let context = format!("Writing to {destination_name} failed");
810 match span {
811 None => IoError::new_internal(err, context, location),
812 Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
813 Some(span) => IoError::new_with_additional_context(err, span, None, context),
814 }
815 };
816
817 let span = span.unwrap_or(Span::unknown());
818 const OUTPUT_CHUNK_SIZE: usize = 8192;
819 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
820 signals.check(&span)?;
821 destination
822 .write_all(chunk)
823 .map_err(|err| io_error_map(err, location!()))?;
824 }
825 destination
826 .flush()
827 .map_err(|err| io_error_map(err, location!()))?;
828 Ok(())
829}
830
831enum PipelineIteratorInner {
832 Empty,
833 Value(Value),
834 ListStream(crate::list_stream::IntoIter),
835 ByteStream(crate::byte_stream::Chunks),
836}
837
838pub struct PipelineIterator(PipelineIteratorInner);
839
840impl IntoIterator for PipelineData {
841 type Item = Value;
842
843 type IntoIter = PipelineIterator;
844
845 fn into_iter(self) -> Self::IntoIter {
846 PipelineIterator(match self {
847 PipelineData::Empty => PipelineIteratorInner::Empty,
848 PipelineData::Value(value, ..) => {
849 let span = value.span();
850 match value {
851 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
852 ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
853 ),
854 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
855 ListStream::new(
856 val.into_range_iter(span, Signals::empty()),
857 span,
858 Signals::empty(),
859 )
860 .into_iter(),
861 ),
862 x => PipelineIteratorInner::Value(x),
863 }
864 }
865 PipelineData::ListStream(stream, ..) => {
866 PipelineIteratorInner::ListStream(stream.into_iter())
867 }
868 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
869 PipelineIteratorInner::Empty,
870 PipelineIteratorInner::ByteStream,
871 ),
872 })
873 }
874}
875
876impl Iterator for PipelineIterator {
877 type Item = Value;
878
879 fn next(&mut self) -> Option<Self::Item> {
880 match &mut self.0 {
881 PipelineIteratorInner::Empty => None,
882 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
883 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
884 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
885 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
886 Ok(x) => x,
887 Err(err) => Value::error(
888 err,
889 Span::unknown(), ),
891 }),
892 }
893 }
894}
895
896pub trait IntoPipelineData {
897 fn into_pipeline_data(self) -> PipelineData;
898
899 fn into_pipeline_data_with_metadata(
900 self,
901 metadata: impl Into<Option<PipelineMetadata>>,
902 ) -> PipelineData;
903}
904
905impl<V> IntoPipelineData for V
906where
907 V: Into<Value>,
908{
909 fn into_pipeline_data(self) -> PipelineData {
910 PipelineData::value(self.into(), None)
911 }
912
913 fn into_pipeline_data_with_metadata(
914 self,
915 metadata: impl Into<Option<PipelineMetadata>>,
916 ) -> PipelineData {
917 PipelineData::value(self.into(), metadata.into())
918 }
919}
920
921pub trait IntoInterruptiblePipelineData {
922 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
923 fn into_pipeline_data_with_metadata(
924 self,
925 span: Span,
926 signals: Signals,
927 metadata: impl Into<Option<PipelineMetadata>>,
928 ) -> PipelineData;
929}
930
931impl<I> IntoInterruptiblePipelineData for I
932where
933 I: IntoIterator + Send + 'static,
934 I::IntoIter: Send + 'static,
935 <I::IntoIter as Iterator>::Item: Into<Value>,
936{
937 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
938 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
939 }
940
941 fn into_pipeline_data_with_metadata(
942 self,
943 span: Span,
944 signals: Signals,
945 metadata: impl Into<Option<PipelineMetadata>>,
946 ) -> PipelineData {
947 PipelineData::list_stream(
948 ListStream::new(self.into_iter().map(Into::into), span, signals),
949 metadata.into(),
950 )
951 }
952}
953
954fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
955 let bytes = match value {
956 Value::String { val, .. } => val.into_bytes(),
957 Value::Binary { val, .. } => val,
958 Value::List { vals, .. } => {
959 let val = vals
960 .into_iter()
961 .map(Value::coerce_into_string)
962 .collect::<Result<Vec<String>, ShellError>>()?
963 .join("\n")
964 + "\n";
965
966 val.into_bytes()
967 }
968 Value::Error { error, .. } => return Err(*error),
970 value => value.coerce_into_string()?.into_bytes(),
971 };
972 Ok(bytes)
973}
974
975pub struct PipelineExecutionData {
979 pub body: PipelineData,
980 #[cfg(feature = "os")]
981 pub exit: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
982}
983
984impl Deref for PipelineExecutionData {
985 type Target = PipelineData;
986
987 fn deref(&self) -> &Self::Target {
988 &self.body
989 }
990}
991
992impl PipelineExecutionData {
993 pub fn empty() -> Self {
994 Self {
995 body: PipelineData::empty(),
996 #[cfg(feature = "os")]
997 exit: vec![],
998 }
999 }
1000}
1001
1002impl From<PipelineData> for PipelineExecutionData {
1003 #[cfg(feature = "os")]
1004 fn from(value: PipelineData) -> Self {
1005 let value_span = value.span().unwrap_or_else(Span::unknown);
1006 let exit_status_future = value.clone_exit_status_future().map(|f| (f, value_span));
1007 Self {
1008 body: value,
1009 exit: vec![exit_status_future],
1010 }
1011 }
1012
1013 #[cfg(not(feature = "os"))]
1014 fn from(value: PipelineData) -> Self {
1015 Self { body: value }
1016 }
1017}