1use crate::{
2 ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
3 Signals, Span, Type, Value,
4 ast::{Call, PathMember},
5 engine::{EngineState, Stack},
6 location,
7 shell_error::{io::IoError, location::Location},
8};
9use std::{borrow::Cow, io::Write};
10
11const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
12
13#[derive(Debug)]
43pub enum PipelineData {
44 Empty,
45 Value(Value, Option<PipelineMetadata>),
46 ListStream(ListStream, Option<PipelineMetadata>),
47 ByteStream(ByteStream, Option<PipelineMetadata>),
48}
49
50impl PipelineData {
51 pub const fn empty() -> PipelineData {
52 PipelineData::Empty
53 }
54
55 pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
56 PipelineData::Value(val, metadata.into())
57 }
58
59 pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
60 PipelineData::ListStream(stream, metadata.into())
61 }
62
63 pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
64 PipelineData::ByteStream(stream, metadata.into())
65 }
66
67 pub fn metadata(&self) -> Option<PipelineMetadata> {
68 match self {
69 PipelineData::Empty => None,
70 PipelineData::Value(_, meta)
71 | PipelineData::ListStream(_, meta)
72 | PipelineData::ByteStream(_, meta) => meta.clone(),
73 }
74 }
75
76 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
77 match &mut self {
78 PipelineData::Empty => {}
79 PipelineData::Value(_, meta)
80 | PipelineData::ListStream(_, meta)
81 | PipelineData::ByteStream(_, meta) => *meta = metadata,
82 }
83 self
84 }
85
86 pub fn is_nothing(&self) -> bool {
87 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
88 || matches!(self, PipelineData::Empty)
89 }
90
91 pub fn span(&self) -> Option<Span> {
93 match self {
94 PipelineData::Empty => None,
95 PipelineData::Value(value, ..) => Some(value.span()),
96 PipelineData::ListStream(stream, ..) => Some(stream.span()),
97 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
98 }
99 }
100
101 pub fn with_span(self, span: Span) -> Self {
105 match self {
106 PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
107 PipelineData::Value(value, metadata) => {
108 PipelineData::value(value.with_span(span), metadata)
109 }
110 PipelineData::ListStream(stream, metadata) => {
111 PipelineData::list_stream(stream.with_span(span), metadata)
112 }
113 PipelineData::ByteStream(stream, metadata) => {
114 PipelineData::byte_stream(stream.with_span(span), metadata)
115 }
116 }
117 }
118
119 pub fn get_type(&self) -> Type {
130 match self {
131 PipelineData::Empty => Type::Nothing,
132 PipelineData::Value(value, _) => value.get_type(),
133 PipelineData::ListStream(_, _) => Type::list(Type::Any),
134 PipelineData::ByteStream(stream, _) => stream.type_().into(),
135 }
136 }
137
138 pub fn is_subtype_of(&self, other: &Type) -> bool {
153 match (self, other) {
154 (_, Type::Any) => true,
155 (PipelineData::Empty, Type::Nothing) => true,
156 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
157
158 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
160
161 (PipelineData::ByteStream(stream, ..), Type::String)
162 if stream.type_().is_string_coercible() =>
163 {
164 true
165 }
166 (PipelineData::ByteStream(stream, ..), Type::Binary)
167 if stream.type_().is_binary_coercible() =>
168 {
169 true
170 }
171
172 (PipelineData::Empty, _) => false,
173 (PipelineData::ListStream(..), _) => false,
174 (PipelineData::ByteStream(..), _) => false,
175 }
176 }
177
178 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
179 match self {
180 PipelineData::Empty => Ok(Value::nothing(span)),
181 PipelineData::Value(value, ..) => {
182 if value.span() == Span::unknown() {
183 Ok(value.with_span(span))
184 } else {
185 Ok(value)
186 }
187 }
188 PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
189 PipelineData::ByteStream(stream, ..) => stream.into_value(),
190 }
191 }
192
193 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
201 let span = self.span().unwrap_or(Span::unknown());
202 match self {
203 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
204 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
205 let metadata = metadata.clone();
206 Ok(PipelineData::list_stream(
207 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
208 metadata,
209 ))
210 }
211 PipelineData::Value(Value::String { val, .. }, metadata) => {
212 Ok(PipelineData::byte_stream(
213 ByteStream::read_string(val, span, engine_state.signals().clone()),
214 metadata,
215 ))
216 }
217 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
218 Ok(PipelineData::byte_stream(
219 ByteStream::read_binary(val, span, engine_state.signals().clone()),
220 metadata,
221 ))
222 }
223 _ => Err(self),
224 }
225 }
226
227 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
231 match self {
232 PipelineData::Empty => Ok(()),
233 PipelineData::Value(value, ..) => {
234 let bytes = value_to_bytes(value)?;
235 dest.write_all(&bytes).map_err(|err| {
236 IoError::new_internal(
237 err,
238 "Could not write PipelineData to dest",
239 crate::location!(),
240 )
241 })?;
242 dest.flush().map_err(|err| {
243 IoError::new_internal(
244 err,
245 "Could not flush PipelineData to dest",
246 crate::location!(),
247 )
248 })?;
249 Ok(())
250 }
251 PipelineData::ListStream(stream, ..) => {
252 for value in stream {
253 let bytes = value_to_bytes(value)?;
254 dest.write_all(&bytes).map_err(|err| {
255 IoError::new_internal(
256 err,
257 "Could not write PipelineData to dest",
258 crate::location!(),
259 )
260 })?;
261 dest.write_all(b"\n").map_err(|err| {
262 IoError::new_internal(
263 err,
264 "Could not write linebreak after PipelineData to dest",
265 crate::location!(),
266 )
267 })?;
268 }
269 dest.flush().map_err(|err| {
270 IoError::new_internal(
271 err,
272 "Could not flush PipelineData to dest",
273 crate::location!(),
274 )
275 })?;
276 Ok(())
277 }
278 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
279 }
280 }
281
282 pub fn drain_to_out_dests(
289 self,
290 engine_state: &EngineState,
291 stack: &mut Stack,
292 ) -> Result<Self, ShellError> {
293 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
294 OutDest::Print => {
295 self.print_table(engine_state, stack, false, false)?;
296 Ok(Self::Empty)
297 }
298 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
299 OutDest::Value => {
300 let metadata = self.metadata();
301 let span = self.span().unwrap_or(Span::unknown());
302 self.into_value(span).map(|val| Self::Value(val, metadata))
303 }
304 OutDest::File(file) => {
305 self.write_to(file.as_ref())?;
306 Ok(Self::Empty)
307 }
308 OutDest::Null | OutDest::Inherit => {
309 self.drain()?;
310 Ok(Self::Empty)
311 }
312 }
313 }
314
315 pub fn drain(self) -> Result<(), ShellError> {
316 match self {
317 Self::Empty => Ok(()),
318 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
319 Self::Value(..) => Ok(()),
320 Self::ListStream(stream, ..) => stream.drain(),
321 Self::ByteStream(stream, ..) => stream.drain(),
322 }
323 }
324
325 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
331 Ok(PipelineIterator(match self {
332 PipelineData::Value(value, ..) => {
333 let val_span = value.span();
334 match value {
335 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
336 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
337 ),
338 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
339 ListStream::new(
340 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
341 val_span,
342 Signals::empty(),
343 )
344 .into_iter(),
345 ),
346 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
347 ListStream::new(
348 val.into_range_iter(val_span, Signals::empty()),
349 val_span,
350 Signals::empty(),
351 )
352 .into_iter(),
353 ),
354 Value::Error { error, .. } => return Err(*error),
356 other => {
357 return Err(ShellError::OnlySupportsThisInputType {
358 exp_input_type: "list, binary, range, or byte stream".into(),
359 wrong_type: other.get_type().to_string(),
360 dst_span: span,
361 src_span: val_span,
362 });
363 }
364 }
365 }
366 PipelineData::ListStream(stream, ..) => {
367 PipelineIteratorInner::ListStream(stream.into_iter())
368 }
369 PipelineData::Empty => {
370 return Err(ShellError::OnlySupportsThisInputType {
371 exp_input_type: "list, binary, range, or byte stream".into(),
372 wrong_type: "null".into(),
373 dst_span: span,
374 src_span: span,
375 });
376 }
377 PipelineData::ByteStream(stream, ..) => {
378 if let Some(chunks) = stream.chunks() {
379 PipelineIteratorInner::ByteStream(chunks)
380 } else {
381 PipelineIteratorInner::Empty
382 }
383 }
384 }))
385 }
386
387 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
388 match self {
389 PipelineData::Empty => Ok(String::new()),
390 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
391 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
392 PipelineData::ByteStream(stream, ..) => stream.into_string(),
393 }
394 }
395
396 pub fn collect_string_strict(
401 self,
402 span: Span,
403 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
404 match self {
405 PipelineData::Empty => Ok((String::new(), span, None)),
406 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
407 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
408 err_message: "string".into(),
409 span: val.span(),
410 }),
411 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
412 err_message: "string".into(),
413 span,
414 }),
415 PipelineData::ByteStream(stream, metadata) => {
416 let span = stream.span();
417 Ok((stream.into_string()?, span, metadata))
418 }
419 }
420 }
421
422 pub fn follow_cell_path(
423 self,
424 cell_path: &[PathMember],
425 head: Span,
426 ) -> Result<Value, ShellError> {
427 match self {
428 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
430 .follow_cell_path(cell_path)
431 .map(Cow::into_owned),
432 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
433 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
434 type_name: "empty pipeline".to_string(),
435 span: head,
436 }),
437 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
438 type_name: stream.type_().describe().to_owned(),
439 span: stream.span(),
440 }),
441 }
442 }
443
444 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
446 where
447 Self: Sized,
448 F: FnMut(Value) -> Value + 'static + Send,
449 {
450 match self {
451 PipelineData::Value(value, metadata) => {
452 let span = value.span();
453 let pipeline = match value {
454 Value::List { vals, .. } => vals
455 .into_iter()
456 .map(f)
457 .into_pipeline_data(span, signals.clone()),
458 Value::Range { val, .. } => val
459 .into_range_iter(span, Signals::empty())
460 .map(f)
461 .into_pipeline_data(span, signals.clone()),
462 value => match f(value) {
463 Value::Error { error, .. } => return Err(*error),
464 v => v.into_pipeline_data(),
465 },
466 };
467 Ok(pipeline.set_metadata(metadata))
468 }
469 PipelineData::Empty => Ok(PipelineData::empty()),
470 PipelineData::ListStream(stream, metadata) => {
471 Ok(PipelineData::list_stream(stream.map(f), metadata))
472 }
473 PipelineData::ByteStream(stream, metadata) => {
474 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
475 }
476 }
477 }
478
479 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
481 where
482 Self: Sized,
483 U: IntoIterator<Item = Value> + 'static,
484 <U as IntoIterator>::IntoIter: 'static + Send,
485 F: FnMut(Value) -> U + 'static + Send,
486 {
487 match self {
488 PipelineData::Empty => Ok(PipelineData::empty()),
489 PipelineData::Value(value, metadata) => {
490 let span = value.span();
491 let pipeline = match value {
492 Value::List { vals, .. } => vals
493 .into_iter()
494 .flat_map(f)
495 .into_pipeline_data(span, signals.clone()),
496 Value::Range { val, .. } => val
497 .into_range_iter(span, Signals::empty())
498 .flat_map(f)
499 .into_pipeline_data(span, signals.clone()),
500 value => f(value)
501 .into_iter()
502 .into_pipeline_data(span, signals.clone()),
503 };
504 Ok(pipeline.set_metadata(metadata))
505 }
506 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
507 stream.modify(|iter| iter.flat_map(f)),
508 metadata,
509 )),
510 PipelineData::ByteStream(stream, metadata) => {
511 let span = stream.span();
513 let iter = match String::from_utf8(stream.into_bytes()?) {
514 Ok(mut str) => {
515 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
516 f(Value::string(str, span))
517 }
518 Err(err) => f(Value::binary(err.into_bytes(), span)),
519 };
520 Ok(iter.into_iter().into_pipeline_data_with_metadata(
521 span,
522 signals.clone(),
523 metadata,
524 ))
525 }
526 }
527 }
528
529 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
530 where
531 Self: Sized,
532 F: FnMut(&Value) -> bool + 'static + Send,
533 {
534 match self {
535 PipelineData::Empty => Ok(PipelineData::empty()),
536 PipelineData::Value(value, metadata) => {
537 let span = value.span();
538 let pipeline = match value {
539 Value::List { vals, .. } => vals
540 .into_iter()
541 .filter(f)
542 .into_pipeline_data(span, signals.clone()),
543 Value::Range { val, .. } => val
544 .into_range_iter(span, Signals::empty())
545 .filter(f)
546 .into_pipeline_data(span, signals.clone()),
547 value => {
548 if f(&value) {
549 value.into_pipeline_data()
550 } else {
551 Value::nothing(span).into_pipeline_data()
552 }
553 }
554 };
555 Ok(pipeline.set_metadata(metadata))
556 }
557 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
558 stream.modify(|iter| iter.filter(f)),
559 metadata,
560 )),
561 PipelineData::ByteStream(stream, metadata) => {
562 let span = stream.span();
564 let value = match String::from_utf8(stream.into_bytes()?) {
565 Ok(mut str) => {
566 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
567 Value::string(str, span)
568 }
569 Err(err) => Value::binary(err.into_bytes(), span),
570 };
571 let value = if f(&value) {
572 value
573 } else {
574 Value::nothing(span)
575 };
576 Ok(value.into_pipeline_data_with_metadata(metadata))
577 }
578 }
579 }
580
581 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
586 match self {
587 PipelineData::Value(v, metadata) => {
588 let span = v.span();
589 match v {
590 Value::Range { val, .. } => {
591 match *val {
592 Range::IntRange(range) => {
593 if range.is_unbounded() {
594 return Err(ShellError::GenericError {
595 error: "Cannot create range".into(),
596 msg: "Unbounded ranges are not allowed when converting to this format".into(),
597 span: Some(span),
598 help: Some("Consider using ranges with valid start and end point.".into()),
599 inner: vec![],
600 });
601 }
602 }
603 Range::FloatRange(range) => {
604 if range.is_unbounded() {
605 return Err(ShellError::GenericError {
606 error: "Cannot create range".into(),
607 msg: "Unbounded ranges are not allowed when converting to this format".into(),
608 span: Some(span),
609 help: Some("Consider using ranges with valid start and end point.".into()),
610 inner: vec![],
611 });
612 }
613 }
614 }
615 let range_values: Vec<Value> =
616 val.into_range_iter(span, Signals::empty()).collect();
617 Ok(PipelineData::value(Value::list(range_values, span), None))
618 }
619 x => Ok(PipelineData::value(x, metadata)),
620 }
621 }
622 _ => Ok(self),
623 }
624 }
625
626 pub fn print_table(
634 self,
635 engine_state: &EngineState,
636 stack: &mut Stack,
637 no_newline: bool,
638 to_stderr: bool,
639 ) -> Result<(), ShellError> {
640 match self {
641 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
643 stream.print(to_stderr)
644 }
645 _ => {
646 if let Some(decl_id) = engine_state.table_decl_id {
649 let command = engine_state.get_decl(decl_id);
650 if command.block_id().is_some() {
651 self.write_all_and_flush(engine_state, no_newline, to_stderr)
652 } else {
653 let call = Call::new(Span::new(0, 0));
654 let table = command.run(engine_state, stack, &(&call).into(), self)?;
655 table.write_all_and_flush(engine_state, no_newline, to_stderr)
656 }
657 } else {
658 self.write_all_and_flush(engine_state, no_newline, to_stderr)
659 }
660 }
661 }
662 }
663
664 pub fn print_raw(
672 self,
673 engine_state: &EngineState,
674 no_newline: bool,
675 to_stderr: bool,
676 ) -> Result<(), ShellError> {
677 let span = self.span();
678 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
679 if to_stderr {
680 write_all_and_flush(
681 bytes,
682 &mut std::io::stderr().lock(),
683 "stderr",
684 span,
685 engine_state.signals(),
686 )?;
687 } else {
688 write_all_and_flush(
689 bytes,
690 &mut std::io::stdout().lock(),
691 "stdout",
692 span,
693 engine_state.signals(),
694 )?;
695 }
696 Ok(())
697 } else {
698 self.write_all_and_flush(engine_state, no_newline, to_stderr)
699 }
700 }
701
702 fn write_all_and_flush(
703 self,
704 engine_state: &EngineState,
705 no_newline: bool,
706 to_stderr: bool,
707 ) -> Result<(), ShellError> {
708 let span = self.span();
709 if let PipelineData::ByteStream(stream, ..) = self {
710 stream.print(to_stderr)
712 } else {
713 let config = engine_state.get_config();
714 for item in self {
715 let mut out = if let Value::Error { error, .. } = item {
716 return Err(*error);
717 } else {
718 item.to_expanded_string("\n", config)
719 };
720
721 if !no_newline {
722 out.push('\n');
723 }
724
725 if to_stderr {
726 write_all_and_flush(
727 out,
728 &mut std::io::stderr().lock(),
729 "stderr",
730 span,
731 engine_state.signals(),
732 )?;
733 } else {
734 write_all_and_flush(
735 out,
736 &mut std::io::stdout().lock(),
737 "stdout",
738 span,
739 engine_state.signals(),
740 )?;
741 }
742 }
743
744 Ok(())
745 }
746 }
747
748 pub fn unsupported_input_error(
749 self,
750 expected_type: impl Into<String>,
751 span: Span,
752 ) -> ShellError {
753 match self {
754 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
755 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
756 exp_input_type: expected_type.into(),
757 wrong_type: value.get_type().get_non_specified_string(),
758 dst_span: span,
759 src_span: value.span(),
760 },
761 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
762 exp_input_type: expected_type.into(),
763 wrong_type: "list (stream)".into(),
764 dst_span: span,
765 src_span: stream.span(),
766 },
767 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
768 exp_input_type: expected_type.into(),
769 wrong_type: stream.type_().describe().into(),
770 dst_span: span,
771 src_span: stream.span(),
772 },
773 }
774 }
775}
776
777pub fn write_all_and_flush<T>(
778 data: T,
779 destination: &mut impl Write,
780 destination_name: &str,
781 span: Option<Span>,
782 signals: &Signals,
783) -> Result<(), ShellError>
784where
785 T: AsRef<[u8]>,
786{
787 let io_error_map = |err: std::io::Error, location: Location| {
788 let context = format!("Writing to {destination_name} failed");
789 match span {
790 None => IoError::new_internal(err, context, location),
791 Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
792 Some(span) => IoError::new_with_additional_context(err, span, None, context),
793 }
794 };
795
796 let span = span.unwrap_or(Span::unknown());
797 const OUTPUT_CHUNK_SIZE: usize = 8192;
798 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
799 signals.check(&span)?;
800 destination
801 .write_all(chunk)
802 .map_err(|err| io_error_map(err, location!()))?;
803 }
804 destination
805 .flush()
806 .map_err(|err| io_error_map(err, location!()))?;
807 Ok(())
808}
809
810enum PipelineIteratorInner {
811 Empty,
812 Value(Value),
813 ListStream(crate::list_stream::IntoIter),
814 ByteStream(crate::byte_stream::Chunks),
815}
816
817pub struct PipelineIterator(PipelineIteratorInner);
818
819impl IntoIterator for PipelineData {
820 type Item = Value;
821
822 type IntoIter = PipelineIterator;
823
824 fn into_iter(self) -> Self::IntoIter {
825 PipelineIterator(match self {
826 PipelineData::Empty => PipelineIteratorInner::Empty,
827 PipelineData::Value(value, ..) => {
828 let span = value.span();
829 match value {
830 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
831 ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
832 ),
833 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
834 ListStream::new(
835 val.into_range_iter(span, Signals::empty()),
836 span,
837 Signals::empty(),
838 )
839 .into_iter(),
840 ),
841 x => PipelineIteratorInner::Value(x),
842 }
843 }
844 PipelineData::ListStream(stream, ..) => {
845 PipelineIteratorInner::ListStream(stream.into_iter())
846 }
847 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
848 PipelineIteratorInner::Empty,
849 PipelineIteratorInner::ByteStream,
850 ),
851 })
852 }
853}
854
855impl Iterator for PipelineIterator {
856 type Item = Value;
857
858 fn next(&mut self) -> Option<Self::Item> {
859 match &mut self.0 {
860 PipelineIteratorInner::Empty => None,
861 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
862 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
863 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
864 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
865 Ok(x) => x,
866 Err(err) => Value::error(
867 err,
868 Span::unknown(), ),
870 }),
871 }
872 }
873}
874
875pub trait IntoPipelineData {
876 fn into_pipeline_data(self) -> PipelineData;
877
878 fn into_pipeline_data_with_metadata(
879 self,
880 metadata: impl Into<Option<PipelineMetadata>>,
881 ) -> PipelineData;
882}
883
884impl<V> IntoPipelineData for V
885where
886 V: Into<Value>,
887{
888 fn into_pipeline_data(self) -> PipelineData {
889 PipelineData::value(self.into(), None)
890 }
891
892 fn into_pipeline_data_with_metadata(
893 self,
894 metadata: impl Into<Option<PipelineMetadata>>,
895 ) -> PipelineData {
896 PipelineData::value(self.into(), metadata.into())
897 }
898}
899
900pub trait IntoInterruptiblePipelineData {
901 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
902 fn into_pipeline_data_with_metadata(
903 self,
904 span: Span,
905 signals: Signals,
906 metadata: impl Into<Option<PipelineMetadata>>,
907 ) -> PipelineData;
908}
909
910impl<I> IntoInterruptiblePipelineData for I
911where
912 I: IntoIterator + Send + 'static,
913 I::IntoIter: Send + 'static,
914 <I::IntoIter as Iterator>::Item: Into<Value>,
915{
916 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
917 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
918 }
919
920 fn into_pipeline_data_with_metadata(
921 self,
922 span: Span,
923 signals: Signals,
924 metadata: impl Into<Option<PipelineMetadata>>,
925 ) -> PipelineData {
926 PipelineData::list_stream(
927 ListStream::new(self.into_iter().map(Into::into), span, signals),
928 metadata.into(),
929 )
930 }
931}
932
933fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
934 let bytes = match value {
935 Value::String { val, .. } => val.into_bytes(),
936 Value::Binary { val, .. } => val,
937 Value::List { vals, .. } => {
938 let val = vals
939 .into_iter()
940 .map(Value::coerce_into_string)
941 .collect::<Result<Vec<String>, ShellError>>()?
942 .join("\n")
943 + "\n";
944
945 val.into_bytes()
946 }
947 Value::Error { error, .. } => return Err(*error),
949 value => value.coerce_into_string()?.into_bytes(),
950 };
951 Ok(bytes)
952}