1use crate::{
2 ast::{Call, PathMember},
3 engine::{EngineState, Stack},
4 location,
5 shell_error::{io::IoError, location::Location},
6 ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
7 Signals, Span, Type, Value,
8};
9use std::io::Write;
10
11const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
12
13#[derive(Debug)]
43pub enum PipelineData {
44 Empty,
45 Value(Value, Option<PipelineMetadata>),
46 ListStream(ListStream, Option<PipelineMetadata>),
47 ByteStream(ByteStream, Option<PipelineMetadata>),
48}
49
50impl PipelineData {
51 pub fn empty() -> PipelineData {
52 PipelineData::Empty
53 }
54
55 pub fn metadata(&self) -> Option<PipelineMetadata> {
56 match self {
57 PipelineData::Empty => None,
58 PipelineData::Value(_, meta)
59 | PipelineData::ListStream(_, meta)
60 | PipelineData::ByteStream(_, meta) => meta.clone(),
61 }
62 }
63
64 pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
65 match &mut self {
66 PipelineData::Empty => {}
67 PipelineData::Value(_, meta)
68 | PipelineData::ListStream(_, meta)
69 | PipelineData::ByteStream(_, meta) => *meta = metadata,
70 }
71 self
72 }
73
74 pub fn is_nothing(&self) -> bool {
75 matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
76 || matches!(self, PipelineData::Empty)
77 }
78
79 pub fn span(&self) -> Option<Span> {
81 match self {
82 PipelineData::Empty => None,
83 PipelineData::Value(value, ..) => Some(value.span()),
84 PipelineData::ListStream(stream, ..) => Some(stream.span()),
85 PipelineData::ByteStream(stream, ..) => Some(stream.span()),
86 }
87 }
88
89 pub fn with_span(self, span: Span) -> Self {
93 match self {
94 PipelineData::Empty => PipelineData::Value(Value::nothing(span), None),
95 PipelineData::Value(value, metadata) => {
96 PipelineData::Value(value.with_span(span), metadata)
97 }
98 PipelineData::ListStream(stream, metadata) => {
99 PipelineData::ListStream(stream.with_span(span), metadata)
100 }
101 PipelineData::ByteStream(stream, metadata) => {
102 PipelineData::ByteStream(stream.with_span(span), metadata)
103 }
104 }
105 }
106
107 pub fn get_type(&self) -> Type {
118 match self {
119 PipelineData::Empty => Type::Nothing,
120 PipelineData::Value(value, _) => value.get_type(),
121 PipelineData::ListStream(_, _) => Type::list(Type::Any),
122 PipelineData::ByteStream(stream, _) => stream.type_().into(),
123 }
124 }
125
126 pub fn is_subtype_of(&self, other: &Type) -> bool {
141 match (self, other) {
142 (_, Type::Any) => true,
143 (PipelineData::Empty, Type::Nothing) => true,
144 (PipelineData::Value(val, ..), ty) => val.is_subtype_of(ty),
145
146 (PipelineData::ListStream(..), Type::List(..) | Type::Table(..)) => true,
148
149 (PipelineData::ByteStream(stream, ..), Type::String)
150 if stream.type_().is_string_coercible() =>
151 {
152 true
153 }
154 (PipelineData::ByteStream(stream, ..), Type::Binary)
155 if stream.type_().is_binary_coercible() =>
156 {
157 true
158 }
159
160 (PipelineData::Empty, _) => false,
161 (PipelineData::ListStream(..), _) => false,
162 (PipelineData::ByteStream(..), _) => false,
163 }
164 }
165
166 pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
167 match self {
168 PipelineData::Empty => Ok(Value::nothing(span)),
169 PipelineData::Value(value, ..) => {
170 if value.span() == Span::unknown() {
171 Ok(value.with_span(span))
172 } else {
173 Ok(value)
174 }
175 }
176 PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
177 PipelineData::ByteStream(stream, ..) => stream.into_value(),
178 }
179 }
180
181 pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
189 let span = self.span().unwrap_or(Span::unknown());
190 match self {
191 PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
192 PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
193 let metadata = metadata.clone();
194 Ok(PipelineData::ListStream(
195 ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
196 metadata,
197 ))
198 }
199 PipelineData::Value(Value::String { val, .. }, metadata) => {
200 Ok(PipelineData::ByteStream(
201 ByteStream::read_string(val, span, engine_state.signals().clone()),
202 metadata,
203 ))
204 }
205 PipelineData::Value(Value::Binary { val, .. }, metadata) => {
206 Ok(PipelineData::ByteStream(
207 ByteStream::read_binary(val, span, engine_state.signals().clone()),
208 metadata,
209 ))
210 }
211 _ => Err(self),
212 }
213 }
214
215 pub fn write_to(self, mut dest: impl Write) -> Result<(), ShellError> {
219 match self {
220 PipelineData::Empty => Ok(()),
221 PipelineData::Value(value, ..) => {
222 let bytes = value_to_bytes(value)?;
223 dest.write_all(&bytes).map_err(|err| {
224 IoError::new_internal(
225 err.kind(),
226 "Could not write PipelineData to dest",
227 crate::location!(),
228 )
229 })?;
230 dest.flush().map_err(|err| {
231 IoError::new_internal(
232 err.kind(),
233 "Could not flush PipelineData to dest",
234 crate::location!(),
235 )
236 })?;
237 Ok(())
238 }
239 PipelineData::ListStream(stream, ..) => {
240 for value in stream {
241 let bytes = value_to_bytes(value)?;
242 dest.write_all(&bytes).map_err(|err| {
243 IoError::new_internal(
244 err.kind(),
245 "Could not write PipelineData to dest",
246 crate::location!(),
247 )
248 })?;
249 dest.write_all(b"\n").map_err(|err| {
250 IoError::new_internal(
251 err.kind(),
252 "Could not write linebreak after PipelineData to dest",
253 crate::location!(),
254 )
255 })?;
256 }
257 dest.flush().map_err(|err| {
258 IoError::new_internal(
259 err.kind(),
260 "Could not flush PipelineData to dest",
261 crate::location!(),
262 )
263 })?;
264 Ok(())
265 }
266 PipelineData::ByteStream(stream, ..) => stream.write_to(dest),
267 }
268 }
269
270 pub fn drain_to_out_dests(
277 self,
278 engine_state: &EngineState,
279 stack: &mut Stack,
280 ) -> Result<Self, ShellError> {
281 match stack.pipe_stdout().unwrap_or(&OutDest::Inherit) {
282 OutDest::Print => {
283 self.print_table(engine_state, stack, false, false)?;
284 Ok(Self::Empty)
285 }
286 OutDest::Pipe | OutDest::PipeSeparate => Ok(self),
287 OutDest::Value => {
288 let metadata = self.metadata();
289 let span = self.span().unwrap_or(Span::unknown());
290 self.into_value(span).map(|val| Self::Value(val, metadata))
291 }
292 OutDest::File(file) => {
293 self.write_to(file.as_ref())?;
294 Ok(Self::Empty)
295 }
296 OutDest::Null | OutDest::Inherit => {
297 self.drain()?;
298 Ok(Self::Empty)
299 }
300 }
301 }
302
303 pub fn drain(self) -> Result<(), ShellError> {
304 match self {
305 Self::Empty => Ok(()),
306 Self::Value(Value::Error { error, .. }, ..) => Err(*error),
307 Self::Value(..) => Ok(()),
308 Self::ListStream(stream, ..) => stream.drain(),
309 Self::ByteStream(stream, ..) => stream.drain(),
310 }
311 }
312
313 pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
319 Ok(PipelineIterator(match self {
320 PipelineData::Value(value, ..) => {
321 let val_span = value.span();
322 match value {
323 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
324 ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
325 ),
326 Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
327 ListStream::new(
328 val.into_iter().map(move |x| Value::int(x as i64, val_span)),
329 val_span,
330 Signals::empty(),
331 )
332 .into_iter(),
333 ),
334 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
335 ListStream::new(
336 val.into_range_iter(val_span, Signals::empty()),
337 val_span,
338 Signals::empty(),
339 )
340 .into_iter(),
341 ),
342 Value::Error { error, .. } => return Err(*error),
344 other => {
345 return Err(ShellError::OnlySupportsThisInputType {
346 exp_input_type: "list, binary, range, or byte stream".into(),
347 wrong_type: other.get_type().to_string(),
348 dst_span: span,
349 src_span: val_span,
350 })
351 }
352 }
353 }
354 PipelineData::ListStream(stream, ..) => {
355 PipelineIteratorInner::ListStream(stream.into_iter())
356 }
357 PipelineData::Empty => {
358 return Err(ShellError::OnlySupportsThisInputType {
359 exp_input_type: "list, binary, range, or byte stream".into(),
360 wrong_type: "null".into(),
361 dst_span: span,
362 src_span: span,
363 })
364 }
365 PipelineData::ByteStream(stream, ..) => {
366 if let Some(chunks) = stream.chunks() {
367 PipelineIteratorInner::ByteStream(chunks)
368 } else {
369 PipelineIteratorInner::Empty
370 }
371 }
372 }))
373 }
374
375 pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
376 match self {
377 PipelineData::Empty => Ok(String::new()),
378 PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
379 PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
380 PipelineData::ByteStream(stream, ..) => stream.into_string(),
381 }
382 }
383
384 pub fn collect_string_strict(
389 self,
390 span: Span,
391 ) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
392 match self {
393 PipelineData::Empty => Ok((String::new(), span, None)),
394 PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
395 PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
396 err_message: "string".into(),
397 span: val.span(),
398 }),
399 PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
400 err_message: "string".into(),
401 span,
402 }),
403 PipelineData::ByteStream(stream, metadata) => {
404 let span = stream.span();
405 Ok((stream.into_string()?, span, metadata))
406 }
407 }
408 }
409
410 pub fn follow_cell_path(
411 self,
412 cell_path: &[PathMember],
413 head: Span,
414 insensitive: bool,
415 ) -> Result<Value, ShellError> {
416 match self {
417 PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
419 .follow_cell_path(cell_path, insensitive),
420 PipelineData::Value(v, ..) => v.follow_cell_path(cell_path, insensitive),
421 PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
422 type_name: "empty pipeline".to_string(),
423 span: head,
424 }),
425 PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
426 type_name: stream.type_().describe().to_owned(),
427 span: stream.span(),
428 }),
429 }
430 }
431
432 pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
434 where
435 Self: Sized,
436 F: FnMut(Value) -> Value + 'static + Send,
437 {
438 match self {
439 PipelineData::Value(value, metadata) => {
440 let span = value.span();
441 let pipeline = match value {
442 Value::List { vals, .. } => vals
443 .into_iter()
444 .map(f)
445 .into_pipeline_data(span, signals.clone()),
446 Value::Range { val, .. } => val
447 .into_range_iter(span, Signals::empty())
448 .map(f)
449 .into_pipeline_data(span, signals.clone()),
450 value => match f(value) {
451 Value::Error { error, .. } => return Err(*error),
452 v => v.into_pipeline_data(),
453 },
454 };
455 Ok(pipeline.set_metadata(metadata))
456 }
457 PipelineData::Empty => Ok(PipelineData::Empty),
458 PipelineData::ListStream(stream, metadata) => {
459 Ok(PipelineData::ListStream(stream.map(f), metadata))
460 }
461 PipelineData::ByteStream(stream, metadata) => {
462 Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
463 }
464 }
465 }
466
467 pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
469 where
470 Self: Sized,
471 U: IntoIterator<Item = Value> + 'static,
472 <U as IntoIterator>::IntoIter: 'static + Send,
473 F: FnMut(Value) -> U + 'static + Send,
474 {
475 match self {
476 PipelineData::Empty => Ok(PipelineData::Empty),
477 PipelineData::Value(value, metadata) => {
478 let span = value.span();
479 let pipeline = match value {
480 Value::List { vals, .. } => vals
481 .into_iter()
482 .flat_map(f)
483 .into_pipeline_data(span, signals.clone()),
484 Value::Range { val, .. } => val
485 .into_range_iter(span, Signals::empty())
486 .flat_map(f)
487 .into_pipeline_data(span, signals.clone()),
488 value => f(value)
489 .into_iter()
490 .into_pipeline_data(span, signals.clone()),
491 };
492 Ok(pipeline.set_metadata(metadata))
493 }
494 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
495 stream.modify(|iter| iter.flat_map(f)),
496 metadata,
497 )),
498 PipelineData::ByteStream(stream, metadata) => {
499 let span = stream.span();
501 let iter = match String::from_utf8(stream.into_bytes()?) {
502 Ok(mut str) => {
503 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
504 f(Value::string(str, span))
505 }
506 Err(err) => f(Value::binary(err.into_bytes(), span)),
507 };
508 Ok(iter.into_iter().into_pipeline_data_with_metadata(
509 span,
510 signals.clone(),
511 metadata,
512 ))
513 }
514 }
515 }
516
517 pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
518 where
519 Self: Sized,
520 F: FnMut(&Value) -> bool + 'static + Send,
521 {
522 match self {
523 PipelineData::Empty => Ok(PipelineData::Empty),
524 PipelineData::Value(value, metadata) => {
525 let span = value.span();
526 let pipeline = match value {
527 Value::List { vals, .. } => vals
528 .into_iter()
529 .filter(f)
530 .into_pipeline_data(span, signals.clone()),
531 Value::Range { val, .. } => val
532 .into_range_iter(span, Signals::empty())
533 .filter(f)
534 .into_pipeline_data(span, signals.clone()),
535 value => {
536 if f(&value) {
537 value.into_pipeline_data()
538 } else {
539 Value::nothing(span).into_pipeline_data()
540 }
541 }
542 };
543 Ok(pipeline.set_metadata(metadata))
544 }
545 PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
546 stream.modify(|iter| iter.filter(f)),
547 metadata,
548 )),
549 PipelineData::ByteStream(stream, metadata) => {
550 let span = stream.span();
552 let value = match String::from_utf8(stream.into_bytes()?) {
553 Ok(mut str) => {
554 str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
555 Value::string(str, span)
556 }
557 Err(err) => Value::binary(err.into_bytes(), span),
558 };
559 let value = if f(&value) {
560 value
561 } else {
562 Value::nothing(span)
563 };
564 Ok(value.into_pipeline_data_with_metadata(metadata))
565 }
566 }
567 }
568
569 pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
574 match self {
575 PipelineData::Value(v, metadata) => {
576 let span = v.span();
577 match v {
578 Value::Range { val, .. } => {
579 match *val {
580 Range::IntRange(range) => {
581 if range.is_unbounded() {
582 return Err(ShellError::GenericError {
583 error: "Cannot create range".into(),
584 msg: "Unbounded ranges are not allowed when converting to this format".into(),
585 span: Some(span),
586 help: Some("Consider using ranges with valid start and end point.".into()),
587 inner: vec![],
588 });
589 }
590 }
591 Range::FloatRange(range) => {
592 if range.is_unbounded() {
593 return Err(ShellError::GenericError {
594 error: "Cannot create range".into(),
595 msg: "Unbounded ranges are not allowed when converting to this format".into(),
596 span: Some(span),
597 help: Some("Consider using ranges with valid start and end point.".into()),
598 inner: vec![],
599 });
600 }
601 }
602 }
603 let range_values: Vec<Value> =
604 val.into_range_iter(span, Signals::empty()).collect();
605 Ok(PipelineData::Value(Value::list(range_values, span), None))
606 }
607 x => Ok(PipelineData::Value(x, metadata)),
608 }
609 }
610 _ => Ok(self),
611 }
612 }
613
614 pub fn print_table(
622 self,
623 engine_state: &EngineState,
624 stack: &mut Stack,
625 no_newline: bool,
626 to_stderr: bool,
627 ) -> Result<(), ShellError> {
628 match self {
629 PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
631 stream.print(to_stderr)
632 }
633 _ => {
634 if let Some(decl_id) = engine_state.table_decl_id {
637 let command = engine_state.get_decl(decl_id);
638 if command.block_id().is_some() {
639 self.write_all_and_flush(engine_state, no_newline, to_stderr)
640 } else {
641 let call = Call::new(Span::new(0, 0));
642 let table = command.run(engine_state, stack, &(&call).into(), self)?;
643 table.write_all_and_flush(engine_state, no_newline, to_stderr)
644 }
645 } else {
646 self.write_all_and_flush(engine_state, no_newline, to_stderr)
647 }
648 }
649 }
650 }
651
652 pub fn print_raw(
660 self,
661 engine_state: &EngineState,
662 no_newline: bool,
663 to_stderr: bool,
664 ) -> Result<(), ShellError> {
665 let span = self.span();
666 if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
667 if to_stderr {
668 write_all_and_flush(
669 bytes,
670 &mut std::io::stderr().lock(),
671 "stderr",
672 span,
673 engine_state.signals(),
674 )?;
675 } else {
676 write_all_and_flush(
677 bytes,
678 &mut std::io::stdout().lock(),
679 "stdout",
680 span,
681 engine_state.signals(),
682 )?;
683 }
684 Ok(())
685 } else {
686 self.write_all_and_flush(engine_state, no_newline, to_stderr)
687 }
688 }
689
690 fn write_all_and_flush(
691 self,
692 engine_state: &EngineState,
693 no_newline: bool,
694 to_stderr: bool,
695 ) -> Result<(), ShellError> {
696 let span = self.span();
697 if let PipelineData::ByteStream(stream, ..) = self {
698 stream.print(to_stderr)
700 } else {
701 let config = engine_state.get_config();
702 for item in self {
703 let mut out = if let Value::Error { error, .. } = item {
704 return Err(*error);
705 } else {
706 item.to_expanded_string("\n", config)
707 };
708
709 if !no_newline {
710 out.push('\n');
711 }
712
713 if to_stderr {
714 write_all_and_flush(
715 out,
716 &mut std::io::stderr().lock(),
717 "stderr",
718 span,
719 engine_state.signals(),
720 )?;
721 } else {
722 write_all_and_flush(
723 out,
724 &mut std::io::stdout().lock(),
725 "stdout",
726 span,
727 engine_state.signals(),
728 )?;
729 }
730 }
731
732 Ok(())
733 }
734 }
735
736 pub fn unsupported_input_error(
737 self,
738 expected_type: impl Into<String>,
739 span: Span,
740 ) -> ShellError {
741 match self {
742 PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
743 PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
744 exp_input_type: expected_type.into(),
745 wrong_type: value.get_type().get_non_specified_string(),
746 dst_span: span,
747 src_span: value.span(),
748 },
749 PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
750 exp_input_type: expected_type.into(),
751 wrong_type: "list (stream)".into(),
752 dst_span: span,
753 src_span: stream.span(),
754 },
755 PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
756 exp_input_type: expected_type.into(),
757 wrong_type: stream.type_().describe().into(),
758 dst_span: span,
759 src_span: stream.span(),
760 },
761 }
762 }
763}
764
765pub fn write_all_and_flush<T>(
766 data: T,
767 destination: &mut impl Write,
768 destination_name: &str,
769 span: Option<Span>,
770 signals: &Signals,
771) -> Result<(), ShellError>
772where
773 T: AsRef<[u8]>,
774{
775 let io_error_map = |err: std::io::Error, location: Location| {
776 let context = format!("Writing to {} failed", destination_name);
777 match span {
778 None => IoError::new_internal(err.kind(), context, location),
779 Some(span) if span == Span::unknown() => {
780 IoError::new_internal(err.kind(), context, location)
781 }
782 Some(span) => IoError::new_with_additional_context(err.kind(), span, None, context),
783 }
784 };
785
786 let span = span.unwrap_or(Span::unknown());
787 const OUTPUT_CHUNK_SIZE: usize = 8192;
788 for chunk in data.as_ref().chunks(OUTPUT_CHUNK_SIZE) {
789 signals.check(span)?;
790 destination
791 .write_all(chunk)
792 .map_err(|err| io_error_map(err, location!()))?;
793 }
794 destination
795 .flush()
796 .map_err(|err| io_error_map(err, location!()))?;
797 Ok(())
798}
799
800enum PipelineIteratorInner {
801 Empty,
802 Value(Value),
803 ListStream(crate::list_stream::IntoIter),
804 ByteStream(crate::byte_stream::Chunks),
805}
806
807pub struct PipelineIterator(PipelineIteratorInner);
808
809impl IntoIterator for PipelineData {
810 type Item = Value;
811
812 type IntoIter = PipelineIterator;
813
814 fn into_iter(self) -> Self::IntoIter {
815 PipelineIterator(match self {
816 PipelineData::Empty => PipelineIteratorInner::Empty,
817 PipelineData::Value(value, ..) => {
818 let span = value.span();
819 match value {
820 Value::List { vals, .. } => PipelineIteratorInner::ListStream(
821 ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
822 ),
823 Value::Range { val, .. } => PipelineIteratorInner::ListStream(
824 ListStream::new(
825 val.into_range_iter(span, Signals::empty()),
826 span,
827 Signals::empty(),
828 )
829 .into_iter(),
830 ),
831 x => PipelineIteratorInner::Value(x),
832 }
833 }
834 PipelineData::ListStream(stream, ..) => {
835 PipelineIteratorInner::ListStream(stream.into_iter())
836 }
837 PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
838 PipelineIteratorInner::Empty,
839 PipelineIteratorInner::ByteStream,
840 ),
841 })
842 }
843}
844
845impl Iterator for PipelineIterator {
846 type Item = Value;
847
848 fn next(&mut self) -> Option<Self::Item> {
849 match &mut self.0 {
850 PipelineIteratorInner::Empty => None,
851 PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
852 PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
853 PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
854 PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
855 Ok(x) => x,
856 Err(err) => Value::error(
857 err,
858 Span::unknown(), ),
860 }),
861 }
862 }
863}
864
865pub trait IntoPipelineData {
866 fn into_pipeline_data(self) -> PipelineData;
867
868 fn into_pipeline_data_with_metadata(
869 self,
870 metadata: impl Into<Option<PipelineMetadata>>,
871 ) -> PipelineData;
872}
873
874impl<V> IntoPipelineData for V
875where
876 V: Into<Value>,
877{
878 fn into_pipeline_data(self) -> PipelineData {
879 PipelineData::Value(self.into(), None)
880 }
881
882 fn into_pipeline_data_with_metadata(
883 self,
884 metadata: impl Into<Option<PipelineMetadata>>,
885 ) -> PipelineData {
886 PipelineData::Value(self.into(), metadata.into())
887 }
888}
889
890pub trait IntoInterruptiblePipelineData {
891 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
892 fn into_pipeline_data_with_metadata(
893 self,
894 span: Span,
895 signals: Signals,
896 metadata: impl Into<Option<PipelineMetadata>>,
897 ) -> PipelineData;
898}
899
900impl<I> IntoInterruptiblePipelineData for I
901where
902 I: IntoIterator + Send + 'static,
903 I::IntoIter: Send + 'static,
904 <I::IntoIter as Iterator>::Item: Into<Value>,
905{
906 fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
907 ListStream::new(self.into_iter().map(Into::into), span, signals).into()
908 }
909
910 fn into_pipeline_data_with_metadata(
911 self,
912 span: Span,
913 signals: Signals,
914 metadata: impl Into<Option<PipelineMetadata>>,
915 ) -> PipelineData {
916 PipelineData::ListStream(
917 ListStream::new(self.into_iter().map(Into::into), span, signals),
918 metadata.into(),
919 )
920 }
921}
922
923fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
924 let bytes = match value {
925 Value::String { val, .. } => val.into_bytes(),
926 Value::Binary { val, .. } => val,
927 Value::List { vals, .. } => {
928 let val = vals
929 .into_iter()
930 .map(Value::coerce_into_string)
931 .collect::<Result<Vec<String>, ShellError>>()?
932 .join("\n")
933 + "\n";
934
935 val.into_bytes()
936 }
937 Value::Error { error, .. } => return Err(*error),
939 value => value.coerce_into_string()?.into_bytes(),
940 };
941 Ok(bytes)
942}