1#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8 IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9 shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use nu_utils::SplitRead as SplitReadInner;
12use serde::{Deserialize, Serialize};
13use std::ops::Bound;
14#[cfg(unix)]
15use std::os::fd::OwnedFd;
16#[cfg(windows)]
17use std::os::windows::io::OwnedHandle;
18use std::{
19 fmt::Debug,
20 fs::File,
21 io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
22 process::Stdio,
23};
24
25pub enum ByteStreamSource {
32 Read(Box<dyn Read + Send + 'static>),
33 File(File),
34 #[cfg(feature = "os")]
35 Child(Box<ChildProcess>),
36}
37
38impl ByteStreamSource {
39 fn reader(self) -> Option<SourceReader> {
40 match self {
41 ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
42 ByteStreamSource::File(file) => Some(SourceReader::File(file)),
43 #[cfg(feature = "os")]
44 ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
45 ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
46 ChildPipe::Tee(tee) => SourceReader::Read(tee),
47 }),
48 }
49 }
50
51 #[cfg(feature = "os")]
53 pub fn is_external(&self) -> bool {
54 matches!(self, ByteStreamSource::Child(..))
55 }
56
57 #[cfg(not(feature = "os"))]
58 pub fn is_external(&self) -> bool {
59 false
61 }
62}
63
64impl Debug for ByteStreamSource {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
68 ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
69 #[cfg(feature = "os")]
70 ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
71 }
72 }
73}
74
75enum SourceReader {
76 Read(Box<dyn Read + Send + 'static>),
77 File(File),
78}
79
80impl Read for SourceReader {
81 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82 match self {
83 SourceReader::Read(reader) => reader.read(buf),
84 SourceReader::File(file) => file.read(buf),
85 }
86 }
87}
88
89impl Debug for SourceReader {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
93 SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
94 }
95 }
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
100pub enum ByteStreamType {
101 Binary,
104 String,
110 #[default]
113 Unknown,
114}
115
116impl ByteStreamType {
117 pub fn describe(self) -> &'static str {
120 match self {
121 ByteStreamType::Binary => "binary (stream)",
122 ByteStreamType::String => "string (stream)",
123 ByteStreamType::Unknown => "byte stream",
124 }
125 }
126
127 pub fn is_binary_coercible(self) -> bool {
129 matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
130 }
131
132 pub fn is_string_coercible(self) -> bool {
134 matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
135 }
136}
137
138impl From<ByteStreamType> for Type {
139 fn from(value: ByteStreamType) -> Self {
140 match value {
141 ByteStreamType::Binary => Type::Binary,
142 ByteStreamType::String => Type::String,
143 ByteStreamType::Unknown => Type::Any,
144 }
145 }
146}
147
148#[derive(Debug)]
191pub struct ByteStream {
192 stream: ByteStreamSource,
193 span: Span,
194 signals: Signals,
195 type_: ByteStreamType,
196 known_size: Option<u64>,
197 caller_spans: Vec<Span>,
198}
199
200impl ByteStream {
201 pub fn new(
203 stream: ByteStreamSource,
204 span: Span,
205 signals: Signals,
206 type_: ByteStreamType,
207 ) -> Self {
208 Self {
209 stream,
210 span,
211 signals,
212 type_,
213 known_size: None,
214 caller_spans: vec![],
215 }
216 }
217
218 pub fn push_caller_span(&mut self, span: Span) {
220 if span != self.span {
221 self.caller_spans.push(span)
222 }
223 }
224
225 pub fn get_caller_spans(&self) -> &Vec<Span> {
227 &self.caller_spans
228 }
229
230 pub fn read(
232 reader: impl Read + Send + 'static,
233 span: Span,
234 signals: Signals,
235 type_: ByteStreamType,
236 ) -> Self {
237 Self::new(
238 ByteStreamSource::Read(Box::new(reader)),
239 span,
240 signals,
241 type_,
242 )
243 }
244
245 pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
246 let known_size = self.known_size.map(|len| len.saturating_sub(n));
247 if let Some(mut reader) = self.reader() {
248 io::copy(&mut (&mut reader).take(n), &mut io::sink())
250 .map_err(|err| IoError::new(err, span, None))?;
251 Ok(
252 ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
253 .with_known_size(known_size),
254 )
255 } else {
256 Err(ShellError::TypeMismatch {
257 err_message: "expected readable stream".into(),
258 span,
259 })
260 }
261 }
262
263 pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
264 let known_size = self.known_size.map(|s| s.min(n));
265 if let Some(reader) = self.reader() {
266 Ok(ByteStream::read(
267 reader.take(n),
268 span,
269 Signals::empty(),
270 ByteStreamType::Binary,
271 )
272 .with_known_size(known_size))
273 } else {
274 Err(ShellError::TypeMismatch {
275 err_message: "expected readable stream".into(),
276 span,
277 })
278 }
279 }
280
281 pub fn slice(
282 self,
283 val_span: Span,
284 call_span: Span,
285 range: IntRange,
286 ) -> Result<Self, ShellError> {
287 if let Some(len) = self.known_size {
288 let start = range.absolute_start(len);
289 let stream = self.skip(val_span, start);
290
291 match range.absolute_end(len) {
292 Bound::Unbounded => stream,
293 Bound::Included(end) | Bound::Excluded(end) if end < start => {
294 stream.and_then(|s| s.take(val_span, 0))
295 }
296 Bound::Included(end) => {
297 let distance = end - start + 1;
298 stream.and_then(|s| s.take(val_span, distance.min(len)))
299 }
300 Bound::Excluded(end) => {
301 let distance = end - start;
302 stream.and_then(|s| s.take(val_span, distance.min(len)))
303 }
304 }
305 } else if range.is_relative() {
306 Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
307 } else {
308 let start = range.start() as u64;
309 let stream = self.skip(val_span, start);
310
311 match range.distance() {
312 Bound::Unbounded => stream,
313 Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
314 Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
315 }
316 }
317 }
318
319 pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
321 let len = string.len();
322 ByteStream::read(
323 Cursor::new(string.into_bytes()),
324 span,
325 signals,
326 ByteStreamType::String,
327 )
328 .with_known_size(Some(len as u64))
329 }
330
331 pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
333 let len = bytes.len();
334 ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
335 .with_known_size(Some(len as u64))
336 }
337
338 pub fn file(file: File, span: Span, signals: Signals) -> Self {
343 Self::new(
344 ByteStreamSource::File(file),
345 span,
346 signals,
347 ByteStreamType::Unknown,
348 )
349 }
350
351 #[cfg(feature = "os")]
356 pub fn child(child: ChildProcess, span: Span) -> Self {
357 Self::new(
358 ByteStreamSource::Child(Box::new(child)),
359 span,
360 Signals::empty(),
361 ByteStreamType::Unknown,
362 )
363 }
364
365 #[cfg(feature = "os")]
370 pub fn stdin(span: Span) -> Result<Self, ShellError> {
371 let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
372 let source = ByteStreamSource::File(convert_file(stdin));
373 Ok(Self::new(
374 source,
375 span,
376 Signals::empty(),
377 ByteStreamType::Unknown,
378 ))
379 }
380
381 #[cfg(not(feature = "os"))]
382 pub fn stdin(span: Span) -> Result<Self, ShellError> {
383 Err(ShellError::DisabledOsSupport {
384 msg: "Stdin is not supported".to_string(),
385 span,
386 })
387 }
388
389 pub fn from_fn(
392 span: Span,
393 signals: Signals,
394 type_: ByteStreamType,
395 generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
396 ) -> Self {
397 Self::read(
398 ReadGenerator {
399 buffer: Cursor::new(Vec::new()),
400 generator,
401 },
402 span,
403 signals,
404 type_,
405 )
406 }
407
408 pub fn with_type(mut self, type_: ByteStreamType) -> Self {
409 self.type_ = type_;
410 self
411 }
412
413 pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
417 where
418 I: IntoIterator,
419 I::IntoIter: Send + 'static,
420 I::Item: AsRef<[u8]> + Default + Send + 'static,
421 {
422 let iter = iter.into_iter();
423 let cursor = Some(Cursor::new(I::Item::default()));
424 Self::read(ReadIterator { iter, cursor }, span, signals, type_)
425 }
426
427 pub fn from_result_iter<I, T>(
431 iter: I,
432 span: Span,
433 signals: Signals,
434 type_: ByteStreamType,
435 ) -> Self
436 where
437 I: IntoIterator<Item = Result<T, ShellError>>,
438 I::IntoIter: Send + 'static,
439 T: AsRef<[u8]> + Default + Send + 'static,
440 {
441 let iter = iter.into_iter();
442 let cursor = Some(Cursor::new(T::default()));
443 Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
444 }
445
446 pub fn with_known_size(mut self, size: Option<u64>) -> Self {
448 self.known_size = size;
449 self
450 }
451
452 pub fn source(&self) -> &ByteStreamSource {
454 &self.stream
455 }
456
457 pub fn source_mut(&mut self) -> &mut ByteStreamSource {
459 &mut self.stream
460 }
461
462 pub fn span(&self) -> Span {
464 self.span
465 }
466
467 pub fn with_span(mut self, span: Span) -> Self {
469 self.span = span;
470 self
471 }
472
473 pub fn type_(&self) -> ByteStreamType {
475 self.type_
476 }
477
478 pub fn known_size(&self) -> Option<u64> {
480 self.known_size
481 }
482
483 pub fn reader(self) -> Option<Reader> {
490 let reader = self.stream.reader()?;
491 Some(Reader {
492 reader: BufReader::new(reader),
493 span: self.span,
494 signals: self.signals,
495 })
496 }
497
498 pub fn lines(self) -> Option<Lines> {
506 let reader = self.stream.reader()?;
507 Some(Lines {
508 reader: BufReader::new(reader),
509 span: self.span,
510 signals: self.signals,
511 strict: false,
512 })
513 }
514
515 pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
523 let reader = self.stream.reader()?;
524 Some(SplitRead::new(reader, delimiter, self.span, self.signals))
525 }
526
527 pub fn chunks(self) -> Option<Chunks> {
543 let reader = self.stream.reader()?;
544 Some(Chunks::new(reader, self.span, self.signals, self.type_))
545 }
546
547 pub fn into_source(self) -> ByteStreamSource {
549 self.stream
550 }
551
552 pub fn into_stdio(mut self) -> Result<Stdio, Self> {
560 match self.stream {
561 ByteStreamSource::Read(..) => Err(self),
562 ByteStreamSource::File(file) => Ok(file.into()),
563 #[cfg(feature = "os")]
564 ByteStreamSource::Child(child) => {
565 if let ChildProcess {
566 stdout: Some(ChildPipe::Pipe(stdout)),
567 stderr,
568 ..
569 } = *child
570 {
571 debug_assert!(stderr.is_none(), "stderr should not exist");
572 Ok(stdout.into())
573 } else {
574 self.stream = ByteStreamSource::Child(child);
575 Err(self)
576 }
577 }
578 }
579 }
580
581 #[cfg(feature = "os")]
586 pub fn into_child(self) -> Result<ChildProcess, Self> {
587 if let ByteStreamSource::Child(child) = self.stream {
588 Ok(*child)
589 } else {
590 Err(self)
591 }
592 }
593
594 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
598 let span = self.span;
599 let signals = self.signals;
600 let from_io_error = IoError::factory(span, None);
601 match self.stream {
602 ByteStreamSource::Read(mut read) => {
603 let mut buf = Vec::new();
604 let mut chunk = [0; DEFAULT_BUF_SIZE];
605 loop {
606 signals.check(&span)?;
607 match read.read(&mut chunk) {
608 Ok(0) => break,
609 Ok(n) => buf.extend_from_slice(&chunk[..n]),
610 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
611 Err(e) => match ShellErrorBridge::try_from(e) {
612 Ok(ShellErrorBridge(e)) => return Err(e),
613 Err(e) => return Err(ShellError::Io(from_io_error(e))),
614 },
615 }
616 }
617 Ok(buf)
618 }
619 ByteStreamSource::File(mut file) => {
620 let mut buf = Vec::new();
621 let mut chunk = [0; DEFAULT_BUF_SIZE];
622 loop {
623 signals.check(&span)?;
624 match file.read(&mut chunk) {
625 Ok(0) => break,
626 Ok(n) => buf.extend_from_slice(&chunk[..n]),
627 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
628 Err(e) => return Err(ShellError::Io(from_io_error(e))),
629 }
630 }
631 Ok(buf)
632 }
633 #[cfg(feature = "os")]
634 ByteStreamSource::Child(child) => child.into_bytes(),
635 }
636 }
637
638 pub fn into_string(self) -> Result<String, ShellError> {
647 let span = self.span;
648 if self.type_.is_string_coercible() {
649 let trim = self.stream.is_external();
650 let bytes = self.into_bytes()?;
651 let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
652 span,
653 msg: err.to_string(),
654 })?;
655 if trim {
656 trim_end_newline(&mut string);
657 }
658 Ok(string)
659 } else {
660 Err(ShellError::TypeMismatch {
661 err_message: "expected string, but got binary".into(),
662 span,
663 })
664 }
665 }
666
667 pub fn into_value(self) -> Result<Value, ShellError> {
680 let span = self.span;
681 let trim = self.stream.is_external();
682 let value = match self.type_ {
683 ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
685 ByteStreamType::String => Value::string(self.into_string()?, span),
686 ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
688 Ok(mut str) => {
689 if trim {
690 trim_end_newline(&mut str);
691 }
692 Value::string(str, span)
693 }
694 Err(err) => Value::binary(err.into_bytes(), span),
695 },
696 };
697 Ok(value)
698 }
699
700 pub fn drain(self) -> Result<(), ShellError> {
702 match self.stream {
703 ByteStreamSource::Read(read) => {
704 copy_with_signals(read, io::sink(), self.span, &self.signals)?;
705 Ok(())
706 }
707 ByteStreamSource::File(_) => Ok(()),
708 #[cfg(feature = "os")]
709 ByteStreamSource::Child(child) => child.wait(),
710 }
711 }
712
713 pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
715 if to_stderr {
716 self.write_to(&mut io::stderr())
717 } else {
718 self.write_to(&mut io::stdout())
719 }
720 }
721
722 pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
724 let span = self.span;
725 let signals = &self.signals;
726 match self.stream {
727 ByteStreamSource::Read(read) => {
728 copy_with_signals(read, dest, span, signals)?;
729 }
730 ByteStreamSource::File(file) => {
731 copy_with_signals(file, dest, span, signals)?;
732 }
733 #[cfg(feature = "os")]
734 ByteStreamSource::Child(mut child) => {
735 debug_assert!(child.stderr.is_none(), "stderr should not exist");
739
740 if let Some(stdout) = child.stdout.take() {
741 match stdout {
742 ChildPipe::Pipe(pipe) => {
743 copy_with_signals(pipe, dest, span, signals)?;
744 }
745 ChildPipe::Tee(tee) => {
746 copy_with_signals(tee, dest, span, signals)?;
747 }
748 }
749 }
750 child.wait()?;
751 }
752 }
753 Ok(())
754 }
755}
756
757impl From<ByteStream> for PipelineData {
758 fn from(stream: ByteStream) -> Self {
759 Self::byte_stream(stream, None)
760 }
761}
762
763struct ReadIterator<I>
764where
765 I: Iterator,
766 I::Item: AsRef<[u8]>,
767{
768 iter: I,
769 cursor: Option<Cursor<I::Item>>,
770}
771
772impl<I> Read for ReadIterator<I>
773where
774 I: Iterator,
775 I::Item: AsRef<[u8]>,
776{
777 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
778 while let Some(cursor) = self.cursor.as_mut() {
779 let read = cursor.read(buf)?;
780 if read == 0 {
781 self.cursor = self.iter.next().map(Cursor::new);
782 } else {
783 return Ok(read);
784 }
785 }
786 Ok(0)
787 }
788}
789
790struct ReadResultIterator<I, T>
791where
792 I: Iterator<Item = Result<T, ShellError>>,
793 T: AsRef<[u8]>,
794{
795 iter: I,
796 cursor: Option<Cursor<T>>,
797}
798
799impl<I, T> Read for ReadResultIterator<I, T>
800where
801 I: Iterator<Item = Result<T, ShellError>>,
802 T: AsRef<[u8]>,
803{
804 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
805 while let Some(cursor) = self.cursor.as_mut() {
806 let read = cursor.read(buf)?;
807 if read == 0 {
808 self.cursor = self
809 .iter
810 .next()
811 .transpose()
812 .map_err(ShellErrorBridge)?
813 .map(Cursor::new);
814 } else {
815 return Ok(read);
816 }
817 }
818 Ok(0)
819 }
820}
821
822pub struct Reader {
823 reader: BufReader<SourceReader>,
824 span: Span,
825 signals: Signals,
826}
827
828impl Reader {
829 pub fn span(&self) -> Span {
830 self.span
831 }
832}
833
834impl Read for Reader {
835 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
836 self.signals.check(&self.span).map_err(ShellErrorBridge)?;
837 self.reader.read(buf)
838 }
839}
840
841impl BufRead for Reader {
842 fn fill_buf(&mut self) -> io::Result<&[u8]> {
843 self.reader.fill_buf()
844 }
845
846 fn consume(&mut self, amt: usize) {
847 self.reader.consume(amt)
848 }
849}
850
851pub struct Lines {
852 reader: BufReader<SourceReader>,
853 span: Span,
854 signals: Signals,
855 strict: bool,
864}
865
866impl Lines {
867 pub fn span(&self) -> Span {
868 self.span
869 }
870
871 pub fn strict(mut self, strict: bool) -> Self {
877 self.strict = strict;
878 self
879 }
880}
881
882impl Iterator for Lines {
883 type Item = Result<String, ShellError>;
884
885 fn next(&mut self) -> Option<Self::Item> {
886 if self.signals.interrupted() {
887 None
888 } else {
889 let mut buf = Vec::new();
890 match self.reader.read_until(b'\n', &mut buf) {
891 Ok(0) => None,
892 Ok(_) => {
893 let mut string = if self.strict {
894 match String::from_utf8(buf) {
895 Ok(s) => s,
896 Err(_) => return Some(Err(ShellError::NonUtf8 { span: self.span })),
897 }
898 } else {
899 String::from_utf8_lossy(&buf).into_owned()
900 };
901 trim_end_newline(&mut string);
902 Some(Ok(string))
903 }
904 Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
905 }
906 }
907 }
908}
909
910pub struct SplitRead {
911 internal: SplitReadInner<BufReader<SourceReader>>,
912 span: Span,
913 signals: Signals,
914}
915
916impl SplitRead {
917 fn new(
918 reader: SourceReader,
919 delimiter: impl AsRef<[u8]>,
920 span: Span,
921 signals: Signals,
922 ) -> Self {
923 Self {
924 internal: SplitReadInner::new(BufReader::new(reader), delimiter),
925 span,
926 signals,
927 }
928 }
929
930 pub fn span(&self) -> Span {
931 self.span
932 }
933}
934
935impl Iterator for SplitRead {
936 type Item = Result<Vec<u8>, ShellError>;
937
938 fn next(&mut self) -> Option<Self::Item> {
939 if self.signals.interrupted() {
940 return None;
941 }
942 self.internal.next().map(|r| {
943 r.map_err(|err| {
944 ShellError::Io(IoError::new_internal(
945 err,
946 "Could not get next value for SplitRead",
947 ))
948 })
949 })
950 }
951}
952
953pub struct Chunks {
959 reader: BufReader<SourceReader>,
960 pos: u64,
961 error: bool,
962 span: Span,
963 signals: Signals,
964 type_: ByteStreamType,
965}
966
967impl Chunks {
968 fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
969 Self {
970 reader: BufReader::new(reader),
971 pos: 0,
972 error: false,
973 span,
974 signals,
975 type_,
976 }
977 }
978
979 pub fn span(&self) -> Span {
980 self.span
981 }
982
983 fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
984 let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
985 Ok(err) => err.0,
986 Err(err) => IoError::new(err, self.span, None).into(),
987 };
988
989 let buf = self
991 .reader
992 .fill_buf()
993 .map_err(from_io_error)
994 .map_err(|err| (vec![], err))?;
995
996 if buf.is_empty() {
998 return Ok(None);
999 }
1000
1001 let mut buf = buf.to_vec();
1002 let mut consumed = 0;
1003
1004 if buf.len() < 4 {
1006 consumed += buf.len();
1007 self.reader.consume(buf.len());
1008 match self.reader.fill_buf() {
1009 Ok(more_bytes) => buf.extend_from_slice(more_bytes),
1010 Err(err) => return Err((buf, from_io_error(err))),
1011 }
1012 }
1013
1014 match String::from_utf8(buf) {
1016 Ok(string) => {
1017 self.reader.consume(string.len() - consumed);
1018 self.pos += string.len() as u64;
1019 Ok(Some(string))
1020 }
1021 Err(err) if err.utf8_error().error_len().is_none() => {
1022 let valid_up_to = err.utf8_error().valid_up_to();
1025 if valid_up_to > consumed {
1026 self.reader.consume(valid_up_to - consumed);
1027 }
1028 let mut buf = err.into_bytes();
1029 buf.truncate(valid_up_to);
1030 buf.shrink_to_fit();
1031 let string = String::from_utf8(buf)
1032 .expect("failed to parse utf-8 even after correcting error");
1033 self.pos += string.len() as u64;
1034 Ok(Some(string))
1035 }
1036 Err(err) => {
1037 let shell_error = ShellError::NonUtf8Custom {
1039 msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
1040 span: self.span,
1041 };
1042 let buf = err.into_bytes();
1043 if buf.len() > consumed {
1046 self.reader.consume(buf.len() - consumed);
1047 }
1048 self.pos += buf.len() as u64;
1049 Err((buf, shell_error))
1050 }
1051 }
1052 }
1053}
1054
1055impl Iterator for Chunks {
1056 type Item = Result<Value, ShellError>;
1057
1058 fn next(&mut self) -> Option<Self::Item> {
1059 if self.error || self.signals.interrupted() {
1060 None
1061 } else {
1062 match self.type_ {
1063 ByteStreamType::Binary => {
1065 let buf = match self.reader.fill_buf() {
1066 Ok(buf) => buf,
1067 Err(err) => {
1068 self.error = true;
1069 return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1070 }
1071 };
1072 if !buf.is_empty() {
1073 let len = buf.len();
1074 let value = Value::binary(buf, self.span);
1075 self.reader.consume(len);
1076 self.pos += len as u64;
1077 Some(Ok(value))
1078 } else {
1079 None
1080 }
1081 }
1082 ByteStreamType::String => match self.next_string().transpose()? {
1084 Ok(string) => Some(Ok(Value::string(string, self.span))),
1085 Err((_, err)) => {
1086 self.error = true;
1087 Some(Err(err))
1088 }
1089 },
1090 ByteStreamType::Unknown => {
1093 match self.next_string().transpose()? {
1094 Ok(string) => Some(Ok(Value::string(string, self.span))),
1095 Err((buf, _)) if !buf.is_empty() => {
1096 self.type_ = ByteStreamType::Binary;
1098 Some(Ok(Value::binary(buf, self.span)))
1099 }
1100 Err((_, err)) => {
1101 self.error = true;
1102 Some(Err(err))
1103 }
1104 }
1105 }
1106 }
1107 }
1108 }
1109}
1110
1111fn trim_end_newline(string: &mut String) {
1112 if string.ends_with('\n') {
1113 string.pop();
1114 if string.ends_with('\r') {
1115 string.pop();
1116 }
1117 }
1118}
1119
1120#[cfg(unix)]
1121pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1122 file.into().into()
1123}
1124
1125#[cfg(windows)]
1126pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1127 file.into().into()
1128}
1129
1130const DEFAULT_BUF_SIZE: usize = 8192;
1131
1132pub fn copy_with_signals(
1133 mut reader: impl Read,
1134 mut writer: impl Write,
1135 span: Span,
1136 signals: &Signals,
1137) -> Result<u64, ShellError> {
1138 let from_io_error = IoError::factory(span, None);
1139 if signals.is_empty() {
1140 match io::copy(&mut reader, &mut writer) {
1141 Ok(n) => {
1142 writer.flush().map_err(&from_io_error)?;
1143 Ok(n)
1144 }
1145 Err(err) => {
1146 let _ = writer.flush();
1147 match ShellErrorBridge::try_from(err) {
1148 Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1149 Err(err) => Err(from_io_error(err).into()),
1150 }
1151 }
1152 }
1153 } else {
1154 match generic_copy(&mut reader, &mut writer, span, signals) {
1159 Ok(len) => {
1160 writer.flush().map_err(&from_io_error)?;
1161 Ok(len)
1162 }
1163 Err(err) => {
1164 let _ = writer.flush();
1165 Err(err)
1166 }
1167 }
1168 }
1169}
1170
1171fn generic_copy(
1173 mut reader: impl Read,
1174 mut writer: impl Write,
1175 span: Span,
1176 signals: &Signals,
1177) -> Result<u64, ShellError> {
1178 let from_io_error = IoError::factory(span, None);
1179 let buf = &mut [0; DEFAULT_BUF_SIZE];
1180 let mut len = 0;
1181 loop {
1182 signals.check(&span)?;
1183 let n = match reader.read(buf) {
1184 Ok(0) => break,
1185 Ok(n) => n,
1186 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1187 Err(e) => match ShellErrorBridge::try_from(e) {
1188 Ok(ShellErrorBridge(e)) => return Err(e),
1189 Err(e) => return Err(from_io_error(e).into()),
1190 },
1191 };
1192 len += n;
1193 writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1194 }
1195 Ok(len as u64)
1196}
1197
1198struct ReadGenerator<F>
1199where
1200 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1201{
1202 buffer: Cursor<Vec<u8>>,
1203 generator: F,
1204}
1205
1206impl<F> BufRead for ReadGenerator<F>
1207where
1208 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1209{
1210 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1211 while self.buffer.fill_buf()?.is_empty() {
1214 self.buffer.set_position(0);
1216 self.buffer.get_mut().clear();
1217 if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1219 break;
1221 }
1222 }
1223 self.buffer.fill_buf()
1224 }
1225
1226 fn consume(&mut self, amt: usize) {
1227 self.buffer.consume(amt);
1228 }
1229}
1230
1231impl<F> Read for ReadGenerator<F>
1232where
1233 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1234{
1235 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1236 let slice = self.fill_buf()?;
1238 let len = buf.len().min(slice.len());
1239 buf[..len].copy_from_slice(&slice[..len]);
1240 self.consume(len);
1241 Ok(len)
1242 }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247 use super::*;
1248
1249 fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1250 where
1251 T: AsRef<[u8]> + Default + Send + 'static,
1252 {
1253 let reader = ReadIterator {
1254 iter: data.into_iter(),
1255 cursor: Some(Cursor::new(T::default())),
1256 };
1257 Chunks::new(
1258 SourceReader::Read(Box::new(reader)),
1259 Span::test_data(),
1260 Signals::empty(),
1261 type_,
1262 )
1263 }
1264
1265 #[test]
1266 fn chunks_read_binary_passthrough() {
1267 let bins = vec![&[0, 1][..], &[2, 3][..]];
1268 let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1269
1270 let bins_values: Vec<Value> = bins
1271 .into_iter()
1272 .map(|bin| Value::binary(bin, Span::test_data()))
1273 .collect();
1274 assert_eq!(
1275 bins_values,
1276 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1277 );
1278 }
1279
1280 #[test]
1281 fn chunks_read_string_clean() {
1282 let strs = vec!["Nushell", "が好きです"];
1283 let iter = test_chunks(strs.clone(), ByteStreamType::String);
1284
1285 let strs_values: Vec<Value> = strs
1286 .into_iter()
1287 .map(|string| Value::string(string, Span::test_data()))
1288 .collect();
1289 assert_eq!(
1290 strs_values,
1291 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1292 );
1293 }
1294
1295 #[test]
1296 fn chunks_read_string_split_boundary() {
1297 let real = "Nushell最高!";
1298 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1299 let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1300
1301 let mut string = String::new();
1302 for value in iter {
1303 let chunk_string = value.expect("error").into_string().expect("not a string");
1304 string.push_str(&chunk_string);
1305 }
1306 assert_eq!(real, string);
1307 }
1308
1309 #[test]
1310 fn chunks_read_string_utf8_error() {
1311 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1312 let iter = test_chunks(chunks, ByteStreamType::String);
1313
1314 let mut string = String::new();
1315 for value in iter {
1316 match value {
1317 Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1318 Err(err) => {
1319 println!("string so far: {string:?}");
1320 println!("got error: {err:?}");
1321 assert!(!string.is_empty());
1322 assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1323 return;
1324 }
1325 }
1326 }
1327 panic!("no error");
1328 }
1329
1330 #[test]
1331 fn chunks_read_unknown_fallback() {
1332 let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1333 let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1334
1335 let mut get = || iter.next().expect("end of iter").expect("error");
1336
1337 assert_eq!(Value::test_string("Nushell"), get());
1338 assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1339 assert_eq!(Value::test_binary(b"efgh"), get());
1341 }
1342}