1use crate::{
2 ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
3 Signals, Span, Type, Value,
4 ast::{Call, PathMember},
5 engine::{EngineState, Stack},
6 location,
7 shell_error::{io::IoError, location::Location},
8};
9use std::{borrow::Cow, io::Write};
10
11const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
12
13#[derive(Debug)]
43pub enum PipelineData {
44 Empty,
45 Value(Value, Option<PipelineMetadata>),
46 ListStream(ListStream, Option<PipelineMetadata>),
47 ByteStream(ByteStream, Option<PipelineMetadata>),
48}
49
50impl PipelineData {
51 pub fn empty() -> PipelineData {
52 PipelineData::Empty
53 }
54
55 pub fn metadata(&self) -> Option<PipelineMetadata> {
56 match self {
57 PipelineData::Empty => None,
58 PipelineData::Value(_, meta)
59 | PipelineData::ListStream(_, meta)
60 | PipelineData::ByteStream(_, meta) => meta.clone(),
61 }
62 }
63
64 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
65 match &mut self {
66 PipelineData::Empty => {}
67 PipelineData::Value(_, meta)
68 | PipelineData::ListStream(_, meta)
69 | PipelineData::ByteStream(_, meta) => *meta = metadata,
70 }
71 self
72 }
73
74 pub fn is_nothing(&self) -> bool {
75 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
76 || matches!(self, PipelineData::Empty)
77 }
78
79 pub fn span(&self) -> Option<Span> {
81 match self {
82 PipelineData::Empty => None,
83 PipelineData::Value(value, ..) => Some(value.span()),
84 PipelineData::ListStream(stream, ..) => Some(stream.span()),
85 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
86 }
87 }
88
89 pub fn with_span(self, span: Span) -> Self {
93 match self {
94 PipelineData::Empty => PipelineData::Value(Value::nothing(span), None),
95 PipelineData::Value(value, metadata) => {
96 PipelineData::Value(value.with_span(span), metadata)
97 }
98 PipelineData::ListStream(stream, metadata) => {
99 PipelineData::ListStream(stream.with_span(span), metadata)
100 }
101 PipelineData::ByteStream(stream, metadata) => {
102 PipelineData::ByteStream(stream.with_span(span), metadata)
103 }
104 }
105 }
106
107 pub fn get_type(&self) -> Type {
118 match self {
119 PipelineData::Empty => Type::Nothing,
120 PipelineData::Value(value, _) => value.get_type(),
121 PipelineData::ListStream(_, _) => Type::list(Type::Any),
122 PipelineData::ByteStream(stream, _) => stream.type_().into(),
123 }
124 }
125
126 pub fn is_subtype_of(&self, other: &Type) -> bool {
141 match (self, other) {
142 (_, Type::Any) => true,
143 (PipelineData::Empty, Type::Nothing) => true,
144 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
145
146 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
148
149 (PipelineData::ByteStream(stream, ..), Type::String)
150 if stream.type_().is_string_coercible() =>
151 {
152 true
153 }
154 (PipelineData::ByteStream(stream, ..), Type::Binary)
155 if stream.type_().is_binary_coercible() =>
156 {
157 true
158 }
159
160 (PipelineData::Empty, _) => false,
161 (PipelineData::ListStream(..), _) => false,
162 (PipelineData::ByteStream(..), _) => false,
163 }
164 }
165
166 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
167 match self {
168 PipelineData::Empty => Ok(Value::nothing(span)),
169 PipelineData::Value(value, ..) => {
170 if value.span() == Span::unknown() {
171 Ok(value.with_span(span))
172 } else {
173 Ok(value)
174 }
175 }
176 PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
177 PipelineData::ByteStream(stream, ..) => stream.into_value(),
178 }
179 }
180
181 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
189 let span = self.span().unwrap_or(Span::unknown());
190 match self {
191 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
192 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
193 let metadata = metadata.clone();
194 Ok(PipelineData::ListStream(
195 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
196 metadata,
197 ))
198 }
199 PipelineData::Value(Value::String { val, .. }, metadata) => {
200 Ok(PipelineData::ByteStream(
201 ByteStream::read_string(val, span, engine_state.signals().clone()),
202 metadata,
203 ))
204 }
205 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
206 Ok(PipelineData::ByteStream(
207 ByteStream::read_binary(val, span, engine_state.signals().clone()),
208 metadata,
209 ))
210 }
211 _ => Err(self),
212 }
213 }
214
215 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
219 match self {
220 PipelineData::Empty => Ok(()),
221 PipelineData::Value(value, ..) => {
222 let bytes = value_to_bytes(value)?;
223 dest.write_all(&bytes).map_err(|err| {
224 IoError::new_internal(
225 err,
226 "Could not write PipelineData to dest",
227 crate::location!(),
228 )
229 })?;
230 dest.flush().map_err(|err| {
231 IoError::new_internal(
232 err,
233 "Could not flush PipelineData to dest",
234 crate::location!(),
235 )
236 })?;
237 Ok(())
238 }
239 PipelineData::ListStream(stream, ..) => {
240 for value in stream {
241 let bytes = value_to_bytes(value)?;
242 dest.write_all(&bytes).map_err(|err| {
243 IoError::new_internal(
244 err,
245 "Could not write PipelineData to dest",
246 crate::location!(),
247 )
248 })?;
249 dest.write_all(b"\n").map_err(|err| {
250 IoError::new_internal(
251 err,
252 "Could not write linebreak after PipelineData to dest",
253 crate::location!(),
254 )
255 })?;
256 }
257 dest.flush().map_err(|err| {
258 IoError::new_internal(
259 err,
260 "Could not flush PipelineData to dest",
261 crate::location!(),
262 )
263 })?;
264 Ok(())
265 }
266 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
267 }
268 }
269
270 pub fn drain_to_out_dests(
277 self,
278 engine_state: &EngineState,
279 stack: &mut Stack,
280 ) -> Result<Self, ShellError> {
281 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
282 OutDest::Print => {
283 self.print_table(engine_state, stack, false, false)?;
284 Ok(Self::Empty)
285 }
286 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
287 OutDest::Value => {
288 let metadata = self.metadata();
289 let span = self.span().unwrap_or(Span::unknown());
290 self.into_value(span).map(|val| Self::Value(val, metadata))
291 }
292 OutDest::File(file) => {
293 self.write_to(file.as_ref())?;
294 Ok(Self::Empty)
295 }
296 OutDest::Null | OutDest::Inherit => {
297 self.drain()?;
298 Ok(Self::Empty)
299 }
300 }
301 }
302
303 pub fn drain(self) -> Result<(), ShellError> {
304 match self {
305 Self::Empty => Ok(()),
306 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
307 Self::Value(..) => Ok(()),
308 Self::ListStream(stream, ..) => stream.drain(),
309 Self::ByteStream(stream, ..) => stream.drain(),
310 }
311 }
312
313 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
319 Ok(PipelineIterator(match self {
320 PipelineData::Value(value, ..) => {
321 let val_span = value.span();
322 match value {
323 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
324 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
325 ),
326 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
327 ListStream::new(
328 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
329 val_span,
330 Signals::empty(),
331 )
332 .into_iter(),
333 ),
334 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
335 ListStream::new(
336 val.into_range_iter(val_span, Signals::empty()),
337 val_span,
338 Signals::empty(),
339 )
340 .into_iter(),
341 ),
342 Value::Error { error, .. } => return Err(*error),
344 other => {
345 return Err(ShellError::OnlySupportsThisInputType {
346 exp_input_type: "list, binary, range, or byte stream".into(),
347 wrong_type: other.get_type().to_string(),
348 dst_span: span,
349 src_span: val_span,
350 });
351 }
352 }
353 }
354 PipelineData::ListStream(stream, ..) => {
355 PipelineIteratorInner::ListStream(stream.into_iter())
356 }
357 PipelineData::Empty => {
358 return Err(ShellError::OnlySupportsThisInputType {
359 exp_input_type: "list, binary, range, or byte stream".into(),
360 wrong_type: "null".into(),
361 dst_span: span,
362 src_span: span,
363 });
364 }
365 PipelineData::ByteStream(stream, ..) => {
366 if let Some(chunks) = stream.chunks() {
367 PipelineIteratorInner::ByteStream(chunks)
368 } else {
369 PipelineIteratorInner::Empty
370 }
371 }
372 }))
373 }
374
375 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
376 match self {
377 PipelineData::Empty => Ok(String::new()),
378 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
379 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
380 PipelineData::ByteStream(stream, ..) => stream.into_string(),
381 }
382 }
383
384 pub fn collect_string_strict(
389 self,
390 span: Span,
391 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
392 match self {
393 PipelineData::Empty => Ok((String::new(), span, None)),
394 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
395 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
396 err_message: "string".into(),
397 span: val.span(),
398 }),
399 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
400 err_message: "string".into(),
401 span,
402 }),
403 PipelineData::ByteStream(stream, metadata) => {
404 let span = stream.span();
405 Ok((stream.into_string()?, span, metadata))
406 }
407 }
408 }
409
410 pub fn follow_cell_path(
411 self,
412 cell_path: &[PathMember],
413 head: Span,
414 ) -> Result<Value, ShellError> {
415 match self {
416 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
418 .follow_cell_path(cell_path)
419 .map(Cow::into_owned),
420 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path).map(Cow::into_owned),
421 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
422 type_name: "empty pipeline".to_string(),
423 span: head,
424 }),
425 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
426 type_name: stream.type_().describe().to_owned(),
427 span: stream.span(),
428 }),
429 }
430 }
431
432 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
434 where
435 Self: Sized,
436 F: FnMut(Value) -> Value + 'static + Send,
437 {
438 match self {
439 PipelineData::Value(value, metadata) => {
440 let span = value.span();
441 let pipeline = match value {
442 Value::List { vals, .. } => vals
443 .into_iter()
444 .map(f)
445 .into_pipeline_data(span, signals.clone()),
446 Value::Range { val, .. } => val
447 .into_range_iter(span, Signals::empty())
448 .map(f)
449 .into_pipeline_data(span, signals.clone()),
450 value => match f(value) {
451 Value::Error { error, .. } => return Err(*error),
452 v => v.into_pipeline_data(),
453 },
454 };
455 Ok(pipeline.set_metadata(metadata))
456 }
457 PipelineData::Empty => Ok(PipelineData::Empty),
458 PipelineData::ListStream(stream, metadata) => {
459 Ok(PipelineData::ListStream(stream.map(f), metadata))
460 }
461 PipelineData::ByteStream(stream, metadata) => {
462 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
463 }
464 }
465 }
466
467 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
469 where
470 Self: Sized,
471 U: IntoIterator<Item = Value> + 'static,
472 <U as IntoIterator>::IntoIter: 'static + Send,
473 F: FnMut(Value) -> U + 'static + Send,
474 {
475 match self {
476 PipelineData::Empty => Ok(PipelineData::Empty),
477 PipelineData::Value(value, metadata) => {
478 let span = value.span();
479 let pipeline = match value {
480 Value::List { vals, .. } => vals
481 .into_iter()
482 .flat_map(f)
483 .into_pipeline_data(span, signals.clone()),
484 Value::Range { val, .. } => val
485 .into_range_iter(span, Signals::empty())
486 .flat_map(f)
487 .into_pipeline_data(span, signals.clone()),
488 value => f(value)
489 .into_iter()
490 .into_pipeline_data(span, signals.clone()),
491 };
492 Ok(pipeline.set_metadata(metadata))
493 }
494 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
495 stream.modify(|iter| iter.flat_map(f)),
496 metadata,
497 )),
498 PipelineData::ByteStream(stream, metadata) => {
499 let span = stream.span();
501 let iter = match String::from_utf8(stream.into_bytes()?) {
502 Ok(mut str) => {
503 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
504 f(Value::string(str, span))
505 }
506 Err(err) => f(Value::binary(err.into_bytes(), span)),
507 };
508 Ok(iter.into_iter().into_pipeline_data_with_metadata(
509 span,
510 signals.clone(),
511 metadata,
512 ))
513 }
514 }
515 }
516
517 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
518 where
519 Self: Sized,
520 F: FnMut(&Value) -> bool + 'static + Send,
521 {
522 match self {
523 PipelineData::Empty => Ok(PipelineData::Empty),
524 PipelineData::Value(value, metadata) => {
525 let span = value.span();
526 let pipeline = match value {
527 Value::List { vals, .. } => vals
528 .into_iter()
529 .filter(f)
530 .into_pipeline_data(span, signals.clone()),
531 Value::Range { val, .. } => val
532 .into_range_iter(span, Signals::empty())
533 .filter(f)
534 .into_pipeline_data(span, signals.clone()),
535 value => {
536 if f(&value) {
537 value.into_pipeline_data()
538 } else {
539 Value::nothing(span).into_pipeline_data()
540 }
541 }
542 };
543 Ok(pipeline.set_metadata(metadata))
544 }
545 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
546 stream.modify(|iter| iter.filter(f)),
547 metadata,
548 )),
549 PipelineData::ByteStream(stream, metadata) => {
550 let span = stream.span();
552 let value = match String::from_utf8(stream.into_bytes()?) {
553 Ok(mut str) => {
554 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
555 Value::string(str, span)
556 }
557 Err(err) => Value::binary(err.into_bytes(), span),
558 };
559 let value = if f(&value) {
560 value
561 } else {
562 Value::nothing(span)
563 };
564 Ok(value.into_pipeline_data_with_metadata(metadata))
565 }
566 }
567 }
568
569 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
574 match self {
575 PipelineData::Value(v, metadata) => {
576 let span = v.span();
577 match v {
578 Value::Range { val, .. } => {
579 match *val {
580 Range::IntRange(range) => {
581 if range.is_unbounded() {
582 return Err(ShellError::GenericError {
583 error: "Cannot create range".into(),
584 msg: "Unbounded ranges are not allowed when converting to this format".into(),
585 span: Some(span),
586 help: Some("Consider using ranges with valid start and end point.".into()),
587 inner: vec![],
588 });
589 }
590 }
591 Range::FloatRange(range) => {
592 if range.is_unbounded() {
593 return Err(ShellError::GenericError {
594 error: "Cannot create range".into(),
595 msg: "Unbounded ranges are not allowed when converting to this format".into(),
596 span: Some(span),
597 help: Some("Consider using ranges with valid start and end point.".into()),
598 inner: vec![],
599 });
600 }
601 }
602 }
603 let range_values: Vec<Value> =
604 val.into_range_iter(span, Signals::empty()).collect();
605 Ok(PipelineData::Value(Value::list(range_values, span), None))
606 }
607 x => Ok(PipelineData::Value(x, metadata)),
608 }
609 }
610 _ => Ok(self),
611 }
612 }
613
614 pub fn print_table(
622 self,
623 engine_state: &EngineState,
624 stack: &mut Stack,
625 no_newline: bool,
626 to_stderr: bool,
627 ) -> Result<(), ShellError> {
628 match self {
629 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
631 stream.print(to_stderr)
632 }
633 _ => {
634 if let Some(decl_id) = engine_state.table_decl_id {
637 let command = engine_state.get_decl(decl_id);
638 if command.block_id().is_some() {
639 self.write_all_and_flush(engine_state, no_newline, to_stderr)
640 } else {
641 let call = Call::new(Span::new(0, 0));
642 let table = command.run(engine_state, stack, &(&call).into(), self)?;
643 table.write_all_and_flush(engine_state, no_newline, to_stderr)
644 }
645 } else {
646 self.write_all_and_flush(engine_state, no_newline, to_stderr)
647 }
648 }
649 }
650 }
651
652 pub fn print_raw(
660 self,
661 engine_state: &EngineState,
662 no_newline: bool,
663 to_stderr: bool,
664 ) -> Result<(), ShellError> {
665 let span = self.span();
666 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
667 if to_stderr {
668 write_all_and_flush(
669 bytes,
670 &mut std::io::stderr().lock(),
671 "stderr",
672 span,
673 engine_state.signals(),
674 )?;
675 } else {
676 write_all_and_flush(
677 bytes,
678 &mut std::io::stdout().lock(),
679 "stdout",
680 span,
681 engine_state.signals(),
682 )?;
683 }
684 Ok(())
685 } else {
686 self.write_all_and_flush(engine_state, no_newline, to_stderr)
687 }
688 }
689
690 fn write_all_and_flush(
691 self,
692 engine_state: &EngineState,
693 no_newline: bool,
694 to_stderr: bool,
695 ) -> Result<(), ShellError> {
696 let span = self.span();
697 if let PipelineData::ByteStream(stream, ..) = self {
698 stream.print(to_stderr)
700 } else {
701 let config = engine_state.get_config();
702 for item in self {
703 let mut out = if let Value::Error { error, .. } = item {
704 return Err(*error);
705 } else {
706 item.to_expanded_string("\n", config)
707 };
708
709 if !no_newline {
710 out.push('\n');
711 }
712
713 if to_stderr {
714 write_all_and_flush(
715 out,
716 &mut std::io::stderr().lock(),
717 "stderr",
718 span,
719 engine_state.signals(),
720 )?;
721 } else {
722 write_all_and_flush(
723 out,
724 &mut std::io::stdout().lock(),
725 "stdout",
726 span,
727 engine_state.signals(),
728 )?;
729 }
730 }
731
732 Ok(())
733 }
734 }
735
736 pub fn unsupported_input_error(
737 self,
738 expected_type: impl Into<String>,
739 span: Span,
740 ) -> ShellError {
741 match self {
742 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
743 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
744 exp_input_type: expected_type.into(),
745 wrong_type: value.get_type().get_non_specified_string(),
746 dst_span: span,
747 src_span: value.span(),
748 },
749 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
750 exp_input_type: expected_type.into(),
751 wrong_type: "list (stream)".into(),
752 dst_span: span,
753 src_span: stream.span(),
754 },
755 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
756 exp_input_type: expected_type.into(),
757 wrong_type: stream.type_().describe().into(),
758 dst_span: span,
759 src_span: stream.span(),
760 },
761 }
762 }
763}
764
765pub fn write_all_and_flush<T>(
766 data: T,
767 destination: &mut impl Write,
768 destination_name: &str,
769 span: Option<Span>,
770 signals: &Signals,
771) -> Result<(), ShellError>
772where
773 T: AsRef<[u8]>,
774{
775 let io_error_map = |err: std::io::Error, location: Location| {
776 let context = format!("Writing to {} failed", destination_name);
777 match span {
778 None => IoError::new_internal(err, context, location),
779 Some(span) if span == Span::unknown() => IoError::new_internal(err, context, location),
780 Some(span) => IoError::new_with_additional_context(err, span, None, context),
781 }
782 };
783
784 let span = span.unwrap_or(Span::unknown());
785 const OUTPUT_CHUNK_SIZE: usize = 8192;
786 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
787 signals.check(span)?;
788 destination
789 .write_all(chunk)
790 .map_err(|err| io_error_map(err, location!()))?;
791 }
792 destination
793 .flush()
794 .map_err(|err| io_error_map(err, location!()))?;
795 Ok(())
796}
797
798enum PipelineIteratorInner {
799 Empty,
800 Value(Value),
801 ListStream(crate::list_stream::IntoIter),
802 ByteStream(crate::byte_stream::Chunks),
803}
804
805pub struct PipelineIterator(PipelineIteratorInner);
806
807impl IntoIterator for PipelineData {
808 type Item = Value;
809
810 type IntoIter = PipelineIterator;
811
812 fn into_iter(self) -> Self::IntoIter {
813 PipelineIterator(match self {
814 PipelineData::Empty => PipelineIteratorInner::Empty,
815 PipelineData::Value(value, ..) => {
816 let span = value.span();
817 match value {
818 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
819 ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
820 ),
821 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
822 ListStream::new(
823 val.into_range_iter(span, Signals::empty()),
824 span,
825 Signals::empty(),
826 )
827 .into_iter(),
828 ),
829 x => PipelineIteratorInner::Value(x),
830 }
831 }
832 PipelineData::ListStream(stream, ..) => {
833 PipelineIteratorInner::ListStream(stream.into_iter())
834 }
835 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
836 PipelineIteratorInner::Empty,
837 PipelineIteratorInner::ByteStream,
838 ),
839 })
840 }
841}
842
843impl Iterator for PipelineIterator {
844 type Item = Value;
845
846 fn next(&mut self) -> Option<Self::Item> {
847 match &mut self.0 {
848 PipelineIteratorInner::Empty => None,
849 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
850 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
851 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
852 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
853 Ok(x) => x,
854 Err(err) => Value::error(
855 err,
856 Span::unknown(), ),
858 }),
859 }
860 }
861}
862
863pub trait IntoPipelineData {
864 fn into_pipeline_data(self) -> PipelineData;
865
866 fn into_pipeline_data_with_metadata(
867 self,
868 metadata: impl Into<Option<PipelineMetadata>>,
869 ) -> PipelineData;
870}
871
872impl<V> IntoPipelineData for V
873where
874 V: Into<Value>,
875{
876 fn into_pipeline_data(self) -> PipelineData {
877 PipelineData::Value(self.into(), None)
878 }
879
880 fn into_pipeline_data_with_metadata(
881 self,
882 metadata: impl Into<Option<PipelineMetadata>>,
883 ) -> PipelineData {
884 PipelineData::Value(self.into(), metadata.into())
885 }
886}
887
888pub trait IntoInterruptiblePipelineData {
889 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
890 fn into_pipeline_data_with_metadata(
891 self,
892 span: Span,
893 signals: Signals,
894 metadata: impl Into<Option<PipelineMetadata>>,
895 ) -> PipelineData;
896}
897
898impl<I> IntoInterruptiblePipelineData for I
899where
900 I: IntoIterator + Send + 'static,
901 I::IntoIter: Send + 'static,
902 <I::IntoIter as Iterator>::Item: Into<Value>,
903{
904 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
905 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
906 }
907
908 fn into_pipeline_data_with_metadata(
909 self,
910 span: Span,
911 signals: Signals,
912 metadata: impl Into<Option<PipelineMetadata>>,
913 ) -> PipelineData {
914 PipelineData::ListStream(
915 ListStream::new(self.into_iter().map(Into::into), span, signals),
916 metadata.into(),
917 )
918 }
919}
920
921fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
922 let bytes = match value {
923 Value::String { val, .. } => val.into_bytes(),
924 Value::Binary { val, .. } => val,
925 Value::List { vals, .. } => {
926 let val = vals
927 .into_iter()
928 .map(Value::coerce_into_string)
929 .collect::<Result<Vec<String>, ShellError>>()?
930 .join("\n")
931 + "\n";
932
933 val.into_bytes()
934 }
935 Value::Error { error, .. } => return Err(*error),
937 value => value.coerce_into_string()?.into_bytes(),
938 };
939 Ok(bytes)
940}