1#[cfg(feature = "os")]
2use crate::process::ExitStatusFuture;
3use crate::{
4 ByteStream, ByteStreamSource, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata,
5 Range, ShellError, Signals, Span, Type, Value,
6 ast::{Call, PathMember},
7 engine::{EngineState, Stack},
8 location,
9 shell_error::{io::IoError, location::Location},
10};
11use std::{
12 borrow::Cow,
13 io::Write,
14 ops::Deref,
15 sync::{Arc, Mutex},
16};
17
18const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
19
20#[derive(Debug)]
50pub enum PipelineData {
51 Empty,
52 Value(Value, Option<PipelineMetadata>),
53 ListStream(ListStream, Option<PipelineMetadata>),
54 ByteStream(ByteStream, Option<PipelineMetadata>),
55}
56
57impl PipelineData {
58 pub const fn empty() -> PipelineData {
59 PipelineData::Empty
60 }
61
62 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
63 PipelineData::Value(val, metadata.into())
64 }
65
66 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
67 PipelineData::ListStream(stream, metadata.into())
68 }
69
70 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
71 PipelineData::ByteStream(stream, metadata.into())
72 }
73
74 pub fn metadata(&self) -> Option<PipelineMetadata> {
75 match self {
76 PipelineData::Empty => None,
77 PipelineData::Value(_, meta)
78 | PipelineData::ListStream(_, meta)
79 | PipelineData::ByteStream(_, meta) => meta.clone(),
80 }
81 }
82
83 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
84 match &mut self {
85 PipelineData::Empty => {}
86 PipelineData::Value(_, meta)
87 | PipelineData::ListStream(_, meta)
88 | PipelineData::ByteStream(_, meta) => *meta = metadata,
89 }
90 self
91 }
92
93 pub fn is_nothing(&self) -> bool {
94 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
95 || matches!(self, PipelineData::Empty)
96 }
97
98 pub fn span(&self) -> Option<Span> {
100 match self {
101 PipelineData::Empty => None,
102 PipelineData::Value(value, ..) => Some(value.span()),
103 PipelineData::ListStream(stream, ..) => Some(stream.span()),
104 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
105 }
106 }
107
108 pub fn with_span(self, span: Span) -> Self {
112 match self {
113 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
114 PipelineData::Value(value, metadata) => {
115 PipelineData::value(value.with_span(span), metadata)
116 }
117 PipelineData::ListStream(stream, metadata) => {
118 PipelineData::list_stream(stream.with_span(span), metadata)
119 }
120 PipelineData::ByteStream(stream, metadata) => {
121 PipelineData::byte_stream(stream.with_span(span), metadata)
122 }
123 }
124 }
125
126 pub fn get_type(&self) -> Type {
137 match self {
138 PipelineData::Empty => Type::Nothing,
139 PipelineData::Value(value, _) => value.get_type(),
140 PipelineData::ListStream(_, _) => Type::list(Type::Any),
141 PipelineData::ByteStream(stream, _) => stream.type_().into(),
142 }
143 }
144
145 pub fn is_subtype_of(&self, other: &Type) -> bool {
160 match (self, other) {
161 (_, Type::Any) => true,
162 (PipelineData::Empty, Type::Nothing) => true,
163 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
164
165 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
167
168 (PipelineData::ByteStream(stream, ..), Type::String)
169 if stream.type_().is_string_coercible() =>
170 {
171 true
172 }
173 (PipelineData::ByteStream(stream, ..), Type::Binary)
174 if stream.type_().is_binary_coercible() =>
175 {
176 true
177 }
178
179 (PipelineData::Empty, _) => false,
180 (PipelineData::ListStream(..), _) => false,
181 (PipelineData::ByteStream(..), _) => false,
182 }
183 }
184
185 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
186 match self {
187 PipelineData::Empty => Ok(Value::nothing(span)),
188 PipelineData::Value(value, ..) => {
189 if value.span() == Span::unknown() {
190 Ok(value.with_span(span))
191 } else {
192 Ok(value)
193 }
194 }
195 PipelineData::ListStream(stream, ..) => stream.into_value(),
196 PipelineData::ByteStream(stream, ..) => stream.into_value(),
197 }
198 }
199
200 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
208 let span = self.span().unwrap_or(Span::unknown());
209 match self {
210 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
211 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
212 let metadata = metadata.clone();
213 Ok(PipelineData::list_stream(
214 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
215 metadata,
216 ))
217 }
218 PipelineData::Value(Value::String { val, .. }, metadata) => {
219 Ok(PipelineData::byte_stream(
220 ByteStream::read_string(val, span, engine_state.signals().clone()),
221 metadata,
222 ))
223 }
224 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
225 Ok(PipelineData::byte_stream(
226 ByteStream::read_binary(val, span, engine_state.signals().clone()),
227 metadata,
228 ))
229 }
230 _ => Err(self),
231 }
232 }
233
234 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
238 match self {
239 PipelineData::Empty => Ok(()),
240 PipelineData::Value(value, ..) => {
241 let bytes = value_to_bytes(value)?;
242 dest.write_all(&bytes).map_err(|err| {
243 IoError::new_internal(
244 err,
245 "Could not write PipelineData to dest",
246 crate::location!(),
247 )
248 })?;
249 dest.flush().map_err(|err| {
250 IoError::new_internal(
251 err,
252 "Could not flush PipelineData to dest",
253 crate::location!(),
254 )
255 })?;
256 Ok(())
257 }
258 PipelineData::ListStream(stream, ..) => {
259 for value in stream {
260 let bytes = value_to_bytes(value)?;
261 dest.write_all(&bytes).map_err(|err| {
262 IoError::new_internal(
263 err,
264 "Could not write PipelineData to dest",
265 crate::location!(),
266 )
267 })?;
268 dest.write_all(b"\n").map_err(|err| {
269 IoError::new_internal(
270 err,
271 "Could not write linebreak after PipelineData to dest",
272 crate::location!(),
273 )
274 })?;
275 }
276 dest.flush().map_err(|err| {
277 IoError::new_internal(
278 err,
279 "Could not flush PipelineData to dest",
280 crate::location!(),
281 )
282 })?;
283 Ok(())
284 }
285 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
286 }
287 }
288
289 pub fn drain_to_out_dests(
296 self,
297 engine_state: &EngineState,
298 stack: &mut Stack,
299 ) -> Result<Self, ShellError> {
300 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
301 OutDest::Print => {
302 self.print_table(engine_state, stack, false, false)?;
303 Ok(Self::Empty)
304 }
305 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
306 OutDest::Value => {
307 let metadata = self.metadata();
308 let span = self.span().unwrap_or(Span::unknown());
309 self.into_value(span).map(|val| Self::Value(val, metadata))
310 }
311 OutDest::File(file) => {
312 self.write_to(file.as_ref())?;
313 Ok(Self::Empty)
314 }
315 OutDest::Null | OutDest::Inherit => {
316 self.drain()?;
317 Ok(Self::Empty)
318 }
319 }
320 }
321
322 pub fn drain(self) -> Result<(), ShellError> {
323 match self {
324 Self::Empty => Ok(()),
325 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
326 Self::Value(..) => Ok(()),
327 Self::ListStream(stream, ..) => stream.drain(),
328 Self::ByteStream(stream, ..) => stream.drain(),
329 }
330 }
331
332 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
338 Ok(PipelineIterator(match self {
339 PipelineData::Value(value, ..) => {
340 let val_span = value.span();
341 match value {
342 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
343 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
344 ),
345 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
346 ListStream::new(
347 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
348 val_span,
349 Signals::empty(),
350 )
351 .into_iter(),
352 ),
353 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
354 ListStream::new(
355 val.into_range_iter(val_span, Signals::empty()),
356 val_span,
357 Signals::empty(),
358 )
359 .into_iter(),
360 ),
361 Value::Error { error, .. } => return Err(*error),
363 other => {
364 return Err(ShellError::OnlySupportsThisInputType {
365 exp_input_type: "list, binary, range, or byte stream".into(),
366 wrong_type: other.get_type().to_string(),
367 dst_span: span,
368 src_span: val_span,
369 });
370 }
371 }
372 }
373 PipelineData::ListStream(stream, ..) => {
374 PipelineIteratorInner::ListStream(stream.into_iter())
375 }
376 PipelineData::Empty => {
377 return Err(ShellError::OnlySupportsThisInputType {
378 exp_input_type: "list, binary, range, or byte stream".into(),
379 wrong_type: "null".into(),
380 dst_span: span,
381 src_span: span,
382 });
383 }
384 PipelineData::ByteStream(stream, ..) => {
385 if let Some(chunks) = stream.chunks() {
386 PipelineIteratorInner::ByteStream(chunks)
387 } else {
388 PipelineIteratorInner::Empty
389 }
390 }
391 }))
392 }
393
394 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
395 match self {
396 PipelineData::Empty => Ok(String::new()),
397 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
398 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
399 PipelineData::ByteStream(stream, ..) => stream.into_string(),
400 }
401 }
402
403 pub fn collect_string_strict(
408 self,
409 span: Span,
410 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
411 match self {
412 PipelineData::Empty => Ok((String::new(), span, None)),
413 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
414 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
415 err_message: "string".into(),
416 span: val.span(),
417 }),
418 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
419 err_message: "string".into(),
420 span,
421 }),
422 PipelineData::ByteStream(stream, metadata) => {
423 let span = stream.span();
424 Ok((stream.into_string()?, span, metadata))
425 }
426 }
427 }
428
429 pub fn follow_cell_path(
430 self,
431 cell_path: &[PathMember],
432 head: Span,
433 ) -> Result<Value, ShellError> {
434 match self {
435 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
437 .follow_cell_path(cell_path)
438 .map(Cow::into_owned),
439 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
440 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
441 type_name: "empty pipeline".to_string(),
442 span: head,
443 }),
444 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
445 type_name: stream.type_().describe().to_owned(),
446 span: stream.span(),
447 }),
448 }
449 }
450
451 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
453 where
454 Self: Sized,
455 F: FnMut(Value) -> Value + 'static + Send,
456 {
457 match self {
458 PipelineData::Value(value, metadata) => {
459 let span = value.span();
460 let pipeline = match value {
461 Value::List { vals, .. } => vals
462 .into_iter()
463 .map(f)
464 .into_pipeline_data(span, signals.clone()),
465 Value::Range { val, .. } => val
466 .into_range_iter(span, Signals::empty())
467 .map(f)
468 .into_pipeline_data(span, signals.clone()),
469 value => match f(value) {
470 Value::Error { error, .. } => return Err(*error),
471 v => v.into_pipeline_data(),
472 },
473 };
474 Ok(pipeline.set_metadata(metadata))
475 }
476 PipelineData::Empty => Ok(PipelineData::empty()),
477 PipelineData::ListStream(stream, metadata) => {
478 Ok(PipelineData::list_stream(stream.map(f), metadata))
479 }
480 PipelineData::ByteStream(stream, metadata) => {
481 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
482 }
483 }
484 }
485
486 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
488 where
489 Self: Sized,
490 U: IntoIterator<Item = Value> + 'static,
491 <U as IntoIterator>::IntoIter: 'static + Send,
492 F: FnMut(Value) -> U + 'static + Send,
493 {
494 match self {
495 PipelineData::Empty => Ok(PipelineData::empty()),
496 PipelineData::Value(value, metadata) => {
497 let span = value.span();
498 let pipeline = match value {
499 Value::List { vals, .. } => vals
500 .into_iter()
501 .flat_map(f)
502 .into_pipeline_data(span, signals.clone()),
503 Value::Range { val, .. } => val
504 .into_range_iter(span, Signals::empty())
505 .flat_map(f)
506 .into_pipeline_data(span, signals.clone()),
507 value => f(value)
508 .into_iter()
509 .into_pipeline_data(span, signals.clone()),
510 };
511 Ok(pipeline.set_metadata(metadata))
512 }
513 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
514 stream.modify(|iter| iter.flat_map(f)),
515 metadata,
516 )),
517 PipelineData::ByteStream(stream, metadata) => {
518 let span = stream.span();
520 let iter = match String::from_utf8(stream.into_bytes()?) {
521 Ok(mut str) => {
522 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
523 f(Value::string(str, span))
524 }
525 Err(err) => f(Value::binary(err.into_bytes(), span)),
526 };
527 Ok(iter.into_iter().into_pipeline_data_with_metadata(
528 span,
529 signals.clone(),
530 metadata,
531 ))
532 }
533 }
534 }
535
536 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
537 where
538 Self: Sized,
539 F: FnMut(&Value) -> bool + 'static + Send,
540 {
541 match self {
542 PipelineData::Empty => Ok(PipelineData::empty()),
543 PipelineData::Value(value, metadata) => {
544 let span = value.span();
545 let pipeline = match value {
546 Value::List { vals, .. } => vals
547 .into_iter()
548 .filter(f)
549 .into_pipeline_data(span, signals.clone()),
550 Value::Range { val, .. } => val
551 .into_range_iter(span, Signals::empty())
552 .filter(f)
553 .into_pipeline_data(span, signals.clone()),
554 value => {
555 if f(&value) {
556 value.into_pipeline_data()
557 } else {
558 Value::nothing(span).into_pipeline_data()
559 }
560 }
561 };
562 Ok(pipeline.set_metadata(metadata))
563 }
564 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
565 stream.modify(|iter| iter.filter(f)),
566 metadata,
567 )),
568 PipelineData::ByteStream(stream, metadata) => {
569 let span = stream.span();
571 let value = match String::from_utf8(stream.into_bytes()?) {
572 Ok(mut str) => {
573 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
574 Value::string(str, span)
575 }
576 Err(err) => Value::binary(err.into_bytes(), span),
577 };
578 let value = if f(&value) {
579 value
580 } else {
581 Value::nothing(span)
582 };
583 Ok(value.into_pipeline_data_with_metadata(metadata))
584 }
585 }
586 }
587
588 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
593 match self {
594 PipelineData::Value(v, metadata) => {
595 let span = v.span();
596 match v {
597 Value::Range { val, .. } => {
598 match *val {
599 Range::IntRange(range) => {
600 if range.is_unbounded() {
601 return Err(ShellError::GenericError {
602 error: "Cannot create range".into(),
603 msg: "Unbounded ranges are not allowed when converting to this format".into(),
604 span: Some(span),
605 help: Some("Consider using ranges with valid start and end point.".into()),
606 inner: vec![],
607 });
608 }
609 }
610 Range::FloatRange(range) => {
611 if range.is_unbounded() {
612 return Err(ShellError::GenericError {
613 error: "Cannot create range".into(),
614 msg: "Unbounded ranges are not allowed when converting to this format".into(),
615 span: Some(span),
616 help: Some("Consider using ranges with valid start and end point.".into()),
617 inner: vec![],
618 });
619 }
620 }
621 }
622 let range_values: Vec<Value> =
623 val.into_range_iter(span, Signals::empty()).collect();
624 Ok(PipelineData::value(Value::list(range_values, span), None))
625 }
626 x => Ok(PipelineData::value(x, metadata)),
627 }
628 }
629 _ => Ok(self),
630 }
631 }
632
633 pub fn print_table(
641 self,
642 engine_state: &EngineState,
643 stack: &mut Stack,
644 no_newline: bool,
645 to_stderr: bool,
646 ) -> Result<(), ShellError> {
647 match self {
648 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
650 stream.print(to_stderr)
651 }
652 _ => {
653 if let Some(decl_id) = engine_state.table_decl_id {
656 let command = engine_state.get_decl(decl_id);
657 if command.block_id().is_some() {
658 self.write_all_and_flush(engine_state, no_newline, to_stderr)
659 } else {
660 let call = Call::new(Span::new(0, 0));
661 let table = command.run(engine_state, stack, &(&call).into(), self)?;
662 table.write_all_and_flush(engine_state, no_newline, to_stderr)
663 }
664 } else {
665 self.write_all_and_flush(engine_state, no_newline, to_stderr)
666 }
667 }
668 }
669 }
670
671 pub fn print_raw(
679 self,
680 engine_state: &EngineState,
681 no_newline: bool,
682 to_stderr: bool,
683 ) -> Result<(), ShellError> {
684 let span = self.span();
685 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
686 if to_stderr {
687 write_all_and_flush(
688 bytes,
689 &mut std::io::stderr().lock(),
690 "stderr",
691 span,
692 engine_state.signals(),
693 )?;
694 } else {
695 write_all_and_flush(
696 bytes,
697 &mut std::io::stdout().lock(),
698 "stdout",
699 span,
700 engine_state.signals(),
701 )?;
702 }
703 Ok(())
704 } else {
705 self.write_all_and_flush(engine_state, no_newline, to_stderr)
706 }
707 }
708
709 fn write_all_and_flush(
710 self,
711 engine_state: &EngineState,
712 no_newline: bool,
713 to_stderr: bool,
714 ) -> Result<(), ShellError> {
715 let span = self.span();
716 if let PipelineData::ByteStream(stream, ..) = self {
717 stream.print(to_stderr)
719 } else {
720 let config = engine_state.get_config();
721 for item in self {
722 let mut out = if let Value::Error { error, .. } = item {
723 return Err(*error);
724 } else {
725 item.to_expanded_string("\n", config)
726 };
727
728 if !no_newline {
729 out.push('\n');
730 }
731
732 if to_stderr {
733 write_all_and_flush(
734 out,
735 &mut std::io::stderr().lock(),
736 "stderr",
737 span,
738 engine_state.signals(),
739 )?;
740 } else {
741 write_all_and_flush(
742 out,
743 &mut std::io::stdout().lock(),
744 "stdout",
745 span,
746 engine_state.signals(),
747 )?;
748 }
749 }
750
751 Ok(())
752 }
753 }
754
755 pub fn unsupported_input_error(
756 self,
757 expected_type: impl Into<String>,
758 span: Span,
759 ) -> ShellError {
760 match self {
761 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
762 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
763 exp_input_type: expected_type.into(),
764 wrong_type: value.get_type().get_non_specified_string(),
765 dst_span: span,
766 src_span: value.span(),
767 },
768 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
769 exp_input_type: expected_type.into(),
770 wrong_type: "list (stream)".into(),
771 dst_span: span,
772 src_span: stream.span(),
773 },
774 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
775 exp_input_type: expected_type.into(),
776 wrong_type: stream.type_().describe().into(),
777 dst_span: span,
778 src_span: stream.span(),
779 },
780 }
781 }
782
783 #[cfg(feature = "os")]
786 pub fn clone_exit_status_future(&self) -> Option<Arc<Mutex<ExitStatusFuture>>> {
787 match self {
788 PipelineData::Empty | PipelineData::Value(..) | PipelineData::ListStream(..) => None,
789 PipelineData::ByteStream(stream, ..) => match stream.source() {
790 ByteStreamSource::Read(..) | ByteStreamSource::File(..) => None,
791 ByteStreamSource::Child(c) => Some(c.clone_exit_status_future()),
792 },
793 }
794 }
795}
796
797pub fn write_all_and_flush<T>(
798 data: T,
799 destination: &mut impl Write,
800 destination_name: &str,
801 span: Option<Span>,
802 signals: &Signals,
803) -> Result<(), ShellError>
804where
805 T: AsRef<[u8]>,
806{
807 let io_error_map = |err: std::io::Error, location: Location| {
808 let context = format!("Writing to {destination_name} failed");
809 match span {
810 None => IoError::new_internal(err, context, location),
811 Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
812 Some(span) => IoError::new_with_additional_context(err, span, None, context),
813 }
814 };
815
816 let span = span.unwrap_or(Span::unknown());
817 const OUTPUT_CHUNK_SIZE: usize = 8192;
818 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
819 signals.check(&span)?;
820 destination
821 .write_all(chunk)
822 .map_err(|err| io_error_map(err, location!()))?;
823 }
824 destination
825 .flush()
826 .map_err(|err| io_error_map(err, location!()))?;
827 Ok(())
828}
829
830enum PipelineIteratorInner {
831 Empty,
832 Value(Value),
833 ListStream(crate::list_stream::IntoIter),
834 ByteStream(crate::byte_stream::Chunks),
835}
836
837pub struct PipelineIterator(PipelineIteratorInner);
838
839impl IntoIterator for PipelineData {
840 type Item = Value;
841
842 type IntoIter = PipelineIterator;
843
844 fn into_iter(self) -> Self::IntoIter {
845 PipelineIterator(match self {
846 PipelineData::Empty => PipelineIteratorInner::Empty,
847 PipelineData::Value(value, ..) => {
848 let span = value.span();
849 match value {
850 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
851 ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
852 ),
853 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
854 ListStream::new(
855 val.into_range_iter(span, Signals::empty()),
856 span,
857 Signals::empty(),
858 )
859 .into_iter(),
860 ),
861 x => PipelineIteratorInner::Value(x),
862 }
863 }
864 PipelineData::ListStream(stream, ..) => {
865 PipelineIteratorInner::ListStream(stream.into_iter())
866 }
867 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
868 PipelineIteratorInner::Empty,
869 PipelineIteratorInner::ByteStream,
870 ),
871 })
872 }
873}
874
875impl Iterator for PipelineIterator {
876 type Item = Value;
877
878 fn next(&mut self) -> Option<Self::Item> {
879 match &mut self.0 {
880 PipelineIteratorInner::Empty => None,
881 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
882 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
883 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
884 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
885 Ok(x) => x,
886 Err(err) => Value::error(
887 err,
888 Span::unknown(), ),
890 }),
891 }
892 }
893}
894
895pub trait IntoPipelineData {
896 fn into_pipeline_data(self) -> PipelineData;
897
898 fn into_pipeline_data_with_metadata(
899 self,
900 metadata: impl Into<Option<PipelineMetadata>>,
901 ) -> PipelineData;
902}
903
904impl<V> IntoPipelineData for V
905where
906 V: Into<Value>,
907{
908 fn into_pipeline_data(self) -> PipelineData {
909 PipelineData::value(self.into(), None)
910 }
911
912 fn into_pipeline_data_with_metadata(
913 self,
914 metadata: impl Into<Option<PipelineMetadata>>,
915 ) -> PipelineData {
916 PipelineData::value(self.into(), metadata.into())
917 }
918}
919
920pub trait IntoInterruptiblePipelineData {
921 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
922 fn into_pipeline_data_with_metadata(
923 self,
924 span: Span,
925 signals: Signals,
926 metadata: impl Into<Option<PipelineMetadata>>,
927 ) -> PipelineData;
928}
929
930impl<I> IntoInterruptiblePipelineData for I
931where
932 I: IntoIterator + Send + 'static,
933 I::IntoIter: Send + 'static,
934 <I::IntoIter as Iterator>::Item: Into<Value>,
935{
936 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
937 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
938 }
939
940 fn into_pipeline_data_with_metadata(
941 self,
942 span: Span,
943 signals: Signals,
944 metadata: impl Into<Option<PipelineMetadata>>,
945 ) -> PipelineData {
946 PipelineData::list_stream(
947 ListStream::new(self.into_iter().map(Into::into), span, signals),
948 metadata.into(),
949 )
950 }
951}
952
953fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
954 let bytes = match value {
955 Value::String { val, .. } => val.into_bytes(),
956 Value::Binary { val, .. } => val,
957 Value::List { vals, .. } => {
958 let val = vals
959 .into_iter()
960 .map(Value::coerce_into_string)
961 .collect::<Result<Vec<String>, ShellError>>()?
962 .join("\n")
963 + "\n";
964
965 val.into_bytes()
966 }
967 Value::Error { error, .. } => return Err(*error),
969 value => value.coerce_into_string()?.into_bytes(),
970 };
971 Ok(bytes)
972}
973
974pub struct PipelineExecutionData {
978 pub body: PipelineData,
979 #[cfg(feature = "os")]
980 pub exit: Vec<Option<(Arc<Mutex<ExitStatusFuture>>, Span)>>,
981}
982
983impl Deref for PipelineExecutionData {
984 type Target = PipelineData;
985
986 fn deref(&self) -> &Self::Target {
987 &self.body
988 }
989}
990
991impl PipelineExecutionData {
992 pub fn empty() -> Self {
993 Self {
994 body: PipelineData::empty(),
995 #[cfg(feature = "os")]
996 exit: vec![],
997 }
998 }
999}
1000
1001impl From<PipelineData> for PipelineExecutionData {
1002 #[cfg(feature = "os")]
1003 fn from(value: PipelineData) -> Self {
1004 let value_span = value.span().unwrap_or_else(Span::unknown);
1005 let exit_status_future = value.clone_exit_status_future().map(|f| (f, value_span));
1006 Self {
1007 body: value,
1008 exit: vec![exit_status_future],
1009 }
1010 }
1011
1012 #[cfg(not(feature = "os"))]
1013 fn from(value: PipelineData) -> Self {
1014 Self { body: value }
1015 }
1016}