1#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8 IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9 shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use nu_utils::SplitRead as SplitReadInner;
12use serde::{Deserialize, Serialize};
13use std::ops::Bound;
14#[cfg(unix)]
15use std::os::fd::OwnedFd;
16#[cfg(windows)]
17use std::os::windows::io::OwnedHandle;
18use std::{
19 fmt::Debug,
20 fs::File,
21 io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
22 process::Stdio,
23};
24
25pub enum ByteStreamSource {
32 Read(Box<dyn Read + Send + 'static>),
33 File(File),
34 #[cfg(feature = "os")]
35 Child(Box<ChildProcess>),
36}
37
38impl ByteStreamSource {
39 fn reader(self) -> Option<SourceReader> {
40 match self {
41 ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
42 ByteStreamSource::File(file) => Some(SourceReader::File(file)),
43 #[cfg(feature = "os")]
44 ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
45 ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
46 ChildPipe::Tee(tee) => SourceReader::Read(tee),
47 }),
48 }
49 }
50
51 #[cfg(feature = "os")]
53 pub fn is_external(&self) -> bool {
54 matches!(self, ByteStreamSource::Child(..))
55 }
56
57 #[cfg(not(feature = "os"))]
58 pub fn is_external(&self) -> bool {
59 false
61 }
62}
63
64impl Debug for ByteStreamSource {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
68 ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
69 #[cfg(feature = "os")]
70 ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
71 }
72 }
73}
74
75enum SourceReader {
76 Read(Box<dyn Read + Send + 'static>),
77 File(File),
78}
79
80impl Read for SourceReader {
81 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82 match self {
83 SourceReader::Read(reader) => reader.read(buf),
84 SourceReader::File(file) => file.read(buf),
85 }
86 }
87}
88
89impl Debug for SourceReader {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
93 SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
94 }
95 }
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
100pub enum ByteStreamType {
101 Binary,
104 String,
110 #[default]
113 Unknown,
114}
115
116impl ByteStreamType {
117 pub fn describe(self) -> &'static str {
120 match self {
121 ByteStreamType::Binary => "binary (stream)",
122 ByteStreamType::String => "string (stream)",
123 ByteStreamType::Unknown => "byte stream",
124 }
125 }
126
127 pub fn is_binary_coercible(self) -> bool {
129 matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
130 }
131
132 pub fn is_string_coercible(self) -> bool {
134 matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
135 }
136}
137
138impl From<ByteStreamType> for Type {
139 fn from(value: ByteStreamType) -> Self {
140 match value {
141 ByteStreamType::Binary => Type::Binary,
142 ByteStreamType::String => Type::String,
143 ByteStreamType::Unknown => Type::Any,
144 }
145 }
146}
147
148#[derive(Debug)]
191pub struct ByteStream {
192 stream: ByteStreamSource,
193 span: Span,
194 signals: Signals,
195 type_: ByteStreamType,
196 known_size: Option<u64>,
197 caller_spans: Vec<Span>,
198}
199
200impl ByteStream {
201 pub fn new(
203 stream: ByteStreamSource,
204 span: Span,
205 signals: Signals,
206 type_: ByteStreamType,
207 ) -> Self {
208 Self {
209 stream,
210 span,
211 signals,
212 type_,
213 known_size: None,
214 caller_spans: vec![],
215 }
216 }
217
218 pub fn push_caller_span(&mut self, span: Span) {
220 if span != self.span {
221 self.caller_spans.push(span)
222 }
223 }
224
225 pub fn get_caller_spans(&self) -> &Vec<Span> {
227 &self.caller_spans
228 }
229
230 pub fn read(
232 reader: impl Read + Send + 'static,
233 span: Span,
234 signals: Signals,
235 type_: ByteStreamType,
236 ) -> Self {
237 Self::new(
238 ByteStreamSource::Read(Box::new(reader)),
239 span,
240 signals,
241 type_,
242 )
243 }
244
245 pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
246 let known_size = self.known_size.map(|len| len.saturating_sub(n));
247 if let Some(mut reader) = self.reader() {
248 io::copy(&mut (&mut reader).take(n), &mut io::sink())
250 .map_err(|err| IoError::new(err, span, None))?;
251 Ok(
252 ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
253 .with_known_size(known_size),
254 )
255 } else {
256 Err(ShellError::TypeMismatch {
257 err_message: "expected readable stream".into(),
258 span,
259 })
260 }
261 }
262
263 pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
264 let known_size = self.known_size.map(|s| s.min(n));
265 if let Some(reader) = self.reader() {
266 Ok(ByteStream::read(
267 reader.take(n),
268 span,
269 Signals::empty(),
270 ByteStreamType::Binary,
271 )
272 .with_known_size(known_size))
273 } else {
274 Err(ShellError::TypeMismatch {
275 err_message: "expected readable stream".into(),
276 span,
277 })
278 }
279 }
280
281 pub fn slice(
282 self,
283 val_span: Span,
284 call_span: Span,
285 range: IntRange,
286 ) -> Result<Self, ShellError> {
287 if let Some(len) = self.known_size {
288 let start = range.absolute_start(len);
289 let stream = self.skip(val_span, start);
290
291 match range.absolute_end(len) {
292 Bound::Unbounded => stream,
293 Bound::Included(end) | Bound::Excluded(end) if end < start => {
294 stream.and_then(|s| s.take(val_span, 0))
295 }
296 Bound::Included(end) => {
297 let distance = end - start + 1;
298 stream.and_then(|s| s.take(val_span, distance.min(len)))
299 }
300 Bound::Excluded(end) => {
301 let distance = end - start;
302 stream.and_then(|s| s.take(val_span, distance.min(len)))
303 }
304 }
305 } else if range.is_relative() {
306 Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
307 } else {
308 let start = range.start() as u64;
309 let stream = self.skip(val_span, start);
310
311 match range.distance() {
312 Bound::Unbounded => stream,
313 Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
314 Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
315 }
316 }
317 }
318
319 pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
321 let len = string.len();
322 ByteStream::read(
323 Cursor::new(string.into_bytes()),
324 span,
325 signals,
326 ByteStreamType::String,
327 )
328 .with_known_size(Some(len as u64))
329 }
330
331 pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
333 let len = bytes.len();
334 ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
335 .with_known_size(Some(len as u64))
336 }
337
338 pub fn file(file: File, span: Span, signals: Signals) -> Self {
343 Self::new(
344 ByteStreamSource::File(file),
345 span,
346 signals,
347 ByteStreamType::Unknown,
348 )
349 }
350
351 #[cfg(feature = "os")]
356 pub fn child(child: ChildProcess, span: Span) -> Self {
357 Self::new(
358 ByteStreamSource::Child(Box::new(child)),
359 span,
360 Signals::empty(),
361 ByteStreamType::Unknown,
362 )
363 }
364
365 #[cfg(feature = "os")]
370 pub fn stdin(span: Span) -> Result<Self, ShellError> {
371 let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
372 let source = ByteStreamSource::File(convert_file(stdin));
373 Ok(Self::new(
374 source,
375 span,
376 Signals::empty(),
377 ByteStreamType::Unknown,
378 ))
379 }
380
381 #[cfg(not(feature = "os"))]
382 pub fn stdin(span: Span) -> Result<Self, ShellError> {
383 Err(ShellError::DisabledOsSupport {
384 msg: "Stdin is not supported".to_string(),
385 span,
386 })
387 }
388
389 pub fn from_fn(
392 span: Span,
393 signals: Signals,
394 type_: ByteStreamType,
395 generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
396 ) -> Self {
397 Self::read(
398 ReadGenerator {
399 buffer: Cursor::new(Vec::new()),
400 generator,
401 },
402 span,
403 signals,
404 type_,
405 )
406 }
407
408 pub fn with_type(mut self, type_: ByteStreamType) -> Self {
409 self.type_ = type_;
410 self
411 }
412
413 pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
417 where
418 I: IntoIterator,
419 I::IntoIter: Send + 'static,
420 I::Item: AsRef<[u8]> + Default + Send + 'static,
421 {
422 let iter = iter.into_iter();
423 let cursor = Some(Cursor::new(I::Item::default()));
424 Self::read(ReadIterator { iter, cursor }, span, signals, type_)
425 }
426
427 pub fn from_result_iter<I, T>(
431 iter: I,
432 span: Span,
433 signals: Signals,
434 type_: ByteStreamType,
435 ) -> Self
436 where
437 I: IntoIterator<Item = Result<T, ShellError>>,
438 I::IntoIter: Send + 'static,
439 T: AsRef<[u8]> + Default + Send + 'static,
440 {
441 let iter = iter.into_iter();
442 let cursor = Some(Cursor::new(T::default()));
443 Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
444 }
445
446 pub fn with_known_size(mut self, size: Option<u64>) -> Self {
448 self.known_size = size;
449 self
450 }
451
452 pub fn source(&self) -> &ByteStreamSource {
454 &self.stream
455 }
456
457 pub fn source_mut(&mut self) -> &mut ByteStreamSource {
459 &mut self.stream
460 }
461
462 pub fn span(&self) -> Span {
464 self.span
465 }
466
467 pub fn with_span(mut self, span: Span) -> Self {
469 self.span = span;
470 self
471 }
472
473 pub fn type_(&self) -> ByteStreamType {
475 self.type_
476 }
477
478 pub fn known_size(&self) -> Option<u64> {
480 self.known_size
481 }
482
483 pub fn reader(self) -> Option<Reader> {
490 let reader = self.stream.reader()?;
491 Some(Reader {
492 reader: BufReader::new(reader),
493 span: self.span,
494 signals: self.signals,
495 })
496 }
497
498 pub fn lines(self) -> Option<Lines> {
506 let reader = self.stream.reader()?;
507 Some(Lines {
508 reader: BufReader::new(reader),
509 span: self.span,
510 signals: self.signals,
511 })
512 }
513
514 pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
522 let reader = self.stream.reader()?;
523 Some(SplitRead::new(reader, delimiter, self.span, self.signals))
524 }
525
526 pub fn chunks(self) -> Option<Chunks> {
542 let reader = self.stream.reader()?;
543 Some(Chunks::new(reader, self.span, self.signals, self.type_))
544 }
545
546 pub fn into_source(self) -> ByteStreamSource {
548 self.stream
549 }
550
551 pub fn into_stdio(mut self) -> Result<Stdio, Self> {
559 match self.stream {
560 ByteStreamSource::Read(..) => Err(self),
561 ByteStreamSource::File(file) => Ok(file.into()),
562 #[cfg(feature = "os")]
563 ByteStreamSource::Child(child) => {
564 if let ChildProcess {
565 stdout: Some(ChildPipe::Pipe(stdout)),
566 stderr,
567 ..
568 } = *child
569 {
570 debug_assert!(stderr.is_none(), "stderr should not exist");
571 Ok(stdout.into())
572 } else {
573 self.stream = ByteStreamSource::Child(child);
574 Err(self)
575 }
576 }
577 }
578 }
579
580 #[cfg(feature = "os")]
585 pub fn into_child(self) -> Result<ChildProcess, Self> {
586 if let ByteStreamSource::Child(child) = self.stream {
587 Ok(*child)
588 } else {
589 Err(self)
590 }
591 }
592
593 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
597 let from_io_error = IoError::factory(self.span, None);
599 match self.stream {
600 ByteStreamSource::Read(mut read) => {
601 let mut buf = Vec::new();
602 read.read_to_end(&mut buf).map_err(|err| {
603 match ShellErrorBridge::try_from(err) {
604 Ok(ShellErrorBridge(err)) => err,
605 Err(err) => ShellError::Io(from_io_error(err)),
606 }
607 })?;
608 Ok(buf)
609 }
610 ByteStreamSource::File(mut file) => {
611 let mut buf = Vec::new();
612 file.read_to_end(&mut buf).map_err(&from_io_error)?;
613 Ok(buf)
614 }
615 #[cfg(feature = "os")]
616 ByteStreamSource::Child(child) => child.into_bytes(),
617 }
618 }
619
620 pub fn into_string(self) -> Result<String, ShellError> {
629 let span = self.span;
630 if self.type_.is_string_coercible() {
631 let trim = self.stream.is_external();
632 let bytes = self.into_bytes()?;
633 let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
634 span,
635 msg: err.to_string(),
636 })?;
637 if trim {
638 trim_end_newline(&mut string);
639 }
640 Ok(string)
641 } else {
642 Err(ShellError::TypeMismatch {
643 err_message: "expected string, but got binary".into(),
644 span,
645 })
646 }
647 }
648
649 pub fn into_value(self) -> Result<Value, ShellError> {
662 let span = self.span;
663 let trim = self.stream.is_external();
664 let value = match self.type_ {
665 ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
667 ByteStreamType::String => Value::string(self.into_string()?, span),
668 ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
670 Ok(mut str) => {
671 if trim {
672 trim_end_newline(&mut str);
673 }
674 Value::string(str, span)
675 }
676 Err(err) => Value::binary(err.into_bytes(), span),
677 },
678 };
679 Ok(value)
680 }
681
682 pub fn drain(self) -> Result<(), ShellError> {
684 match self.stream {
685 ByteStreamSource::Read(read) => {
686 copy_with_signals(read, io::sink(), self.span, &self.signals)?;
687 Ok(())
688 }
689 ByteStreamSource::File(_) => Ok(()),
690 #[cfg(feature = "os")]
691 ByteStreamSource::Child(child) => child.wait(),
692 }
693 }
694
695 pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
697 if to_stderr {
698 self.write_to(&mut io::stderr())
699 } else {
700 self.write_to(&mut io::stdout())
701 }
702 }
703
704 pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
706 let span = self.span;
707 let signals = &self.signals;
708 match self.stream {
709 ByteStreamSource::Read(read) => {
710 copy_with_signals(read, dest, span, signals)?;
711 }
712 ByteStreamSource::File(file) => {
713 copy_with_signals(file, dest, span, signals)?;
714 }
715 #[cfg(feature = "os")]
716 ByteStreamSource::Child(mut child) => {
717 debug_assert!(child.stderr.is_none(), "stderr should not exist");
721
722 if let Some(stdout) = child.stdout.take() {
723 match stdout {
724 ChildPipe::Pipe(pipe) => {
725 copy_with_signals(pipe, dest, span, signals)?;
726 }
727 ChildPipe::Tee(tee) => {
728 copy_with_signals(tee, dest, span, signals)?;
729 }
730 }
731 }
732 child.wait()?;
733 }
734 }
735 Ok(())
736 }
737}
738
739impl From<ByteStream> for PipelineData {
740 fn from(stream: ByteStream) -> Self {
741 Self::byte_stream(stream, None)
742 }
743}
744
745struct ReadIterator<I>
746where
747 I: Iterator,
748 I::Item: AsRef<[u8]>,
749{
750 iter: I,
751 cursor: Option<Cursor<I::Item>>,
752}
753
754impl<I> Read for ReadIterator<I>
755where
756 I: Iterator,
757 I::Item: AsRef<[u8]>,
758{
759 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
760 while let Some(cursor) = self.cursor.as_mut() {
761 let read = cursor.read(buf)?;
762 if read == 0 {
763 self.cursor = self.iter.next().map(Cursor::new);
764 } else {
765 return Ok(read);
766 }
767 }
768 Ok(0)
769 }
770}
771
772struct ReadResultIterator<I, T>
773where
774 I: Iterator<Item = Result<T, ShellError>>,
775 T: AsRef<[u8]>,
776{
777 iter: I,
778 cursor: Option<Cursor<T>>,
779}
780
781impl<I, T> Read for ReadResultIterator<I, T>
782where
783 I: Iterator<Item = Result<T, ShellError>>,
784 T: AsRef<[u8]>,
785{
786 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
787 while let Some(cursor) = self.cursor.as_mut() {
788 let read = cursor.read(buf)?;
789 if read == 0 {
790 self.cursor = self
791 .iter
792 .next()
793 .transpose()
794 .map_err(ShellErrorBridge)?
795 .map(Cursor::new);
796 } else {
797 return Ok(read);
798 }
799 }
800 Ok(0)
801 }
802}
803
804pub struct Reader {
805 reader: BufReader<SourceReader>,
806 span: Span,
807 signals: Signals,
808}
809
810impl Reader {
811 pub fn span(&self) -> Span {
812 self.span
813 }
814}
815
816impl Read for Reader {
817 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
818 self.signals.check(&self.span).map_err(ShellErrorBridge)?;
819 self.reader.read(buf)
820 }
821}
822
823impl BufRead for Reader {
824 fn fill_buf(&mut self) -> io::Result<&[u8]> {
825 self.reader.fill_buf()
826 }
827
828 fn consume(&mut self, amt: usize) {
829 self.reader.consume(amt)
830 }
831}
832
833pub struct Lines {
834 reader: BufReader<SourceReader>,
835 span: Span,
836 signals: Signals,
837}
838
839impl Lines {
840 pub fn span(&self) -> Span {
841 self.span
842 }
843}
844
845impl Iterator for Lines {
846 type Item = Result<String, ShellError>;
847
848 fn next(&mut self) -> Option<Self::Item> {
849 if self.signals.interrupted() {
850 None
851 } else {
852 let mut buf = Vec::new();
853 match self.reader.read_until(b'\n', &mut buf) {
854 Ok(0) => None,
855 Ok(_) => {
856 let Ok(mut string) = String::from_utf8(buf) else {
857 return Some(Err(ShellError::NonUtf8 { span: self.span }));
858 };
859 trim_end_newline(&mut string);
860 Some(Ok(string))
861 }
862 Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
863 }
864 }
865 }
866}
867
868pub struct SplitRead {
869 internal: SplitReadInner<BufReader<SourceReader>>,
870 span: Span,
871 signals: Signals,
872}
873
874impl SplitRead {
875 fn new(
876 reader: SourceReader,
877 delimiter: impl AsRef<[u8]>,
878 span: Span,
879 signals: Signals,
880 ) -> Self {
881 Self {
882 internal: SplitReadInner::new(BufReader::new(reader), delimiter),
883 span,
884 signals,
885 }
886 }
887
888 pub fn span(&self) -> Span {
889 self.span
890 }
891}
892
893impl Iterator for SplitRead {
894 type Item = Result<Vec<u8>, ShellError>;
895
896 fn next(&mut self) -> Option<Self::Item> {
897 if self.signals.interrupted() {
898 return None;
899 }
900 self.internal.next().map(|r| {
901 r.map_err(|err| {
902 ShellError::Io(IoError::new_internal(
903 err,
904 "Could not get next value for SplitRead",
905 crate::location!(),
906 ))
907 })
908 })
909 }
910}
911
912pub struct Chunks {
918 reader: BufReader<SourceReader>,
919 pos: u64,
920 error: bool,
921 span: Span,
922 signals: Signals,
923 type_: ByteStreamType,
924}
925
926impl Chunks {
927 fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
928 Self {
929 reader: BufReader::new(reader),
930 pos: 0,
931 error: false,
932 span,
933 signals,
934 type_,
935 }
936 }
937
938 pub fn span(&self) -> Span {
939 self.span
940 }
941
942 fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
943 let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
944 Ok(err) => err.0,
945 Err(err) => IoError::new(err, self.span, None).into(),
946 };
947
948 let buf = self
950 .reader
951 .fill_buf()
952 .map_err(from_io_error)
953 .map_err(|err| (vec![], err))?;
954
955 if buf.is_empty() {
957 return Ok(None);
958 }
959
960 let mut buf = buf.to_vec();
961 let mut consumed = 0;
962
963 if buf.len() < 4 {
965 consumed += buf.len();
966 self.reader.consume(buf.len());
967 match self.reader.fill_buf() {
968 Ok(more_bytes) => buf.extend_from_slice(more_bytes),
969 Err(err) => return Err((buf, from_io_error(err))),
970 }
971 }
972
973 match String::from_utf8(buf) {
975 Ok(string) => {
976 self.reader.consume(string.len() - consumed);
977 self.pos += string.len() as u64;
978 Ok(Some(string))
979 }
980 Err(err) if err.utf8_error().error_len().is_none() => {
981 let valid_up_to = err.utf8_error().valid_up_to();
984 if valid_up_to > consumed {
985 self.reader.consume(valid_up_to - consumed);
986 }
987 let mut buf = err.into_bytes();
988 buf.truncate(valid_up_to);
989 buf.shrink_to_fit();
990 let string = String::from_utf8(buf)
991 .expect("failed to parse utf-8 even after correcting error");
992 self.pos += string.len() as u64;
993 Ok(Some(string))
994 }
995 Err(err) => {
996 let shell_error = ShellError::NonUtf8Custom {
998 msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
999 span: self.span,
1000 };
1001 let buf = err.into_bytes();
1002 if buf.len() > consumed {
1005 self.reader.consume(buf.len() - consumed);
1006 }
1007 self.pos += buf.len() as u64;
1008 Err((buf, shell_error))
1009 }
1010 }
1011 }
1012}
1013
1014impl Iterator for Chunks {
1015 type Item = Result<Value, ShellError>;
1016
1017 fn next(&mut self) -> Option<Self::Item> {
1018 if self.error || self.signals.interrupted() {
1019 None
1020 } else {
1021 match self.type_ {
1022 ByteStreamType::Binary => {
1024 let buf = match self.reader.fill_buf() {
1025 Ok(buf) => buf,
1026 Err(err) => {
1027 self.error = true;
1028 return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1029 }
1030 };
1031 if !buf.is_empty() {
1032 let len = buf.len();
1033 let value = Value::binary(buf, self.span);
1034 self.reader.consume(len);
1035 self.pos += len as u64;
1036 Some(Ok(value))
1037 } else {
1038 None
1039 }
1040 }
1041 ByteStreamType::String => match self.next_string().transpose()? {
1043 Ok(string) => Some(Ok(Value::string(string, self.span))),
1044 Err((_, err)) => {
1045 self.error = true;
1046 Some(Err(err))
1047 }
1048 },
1049 ByteStreamType::Unknown => {
1052 match self.next_string().transpose()? {
1053 Ok(string) => Some(Ok(Value::string(string, self.span))),
1054 Err((buf, _)) if !buf.is_empty() => {
1055 self.type_ = ByteStreamType::Binary;
1057 Some(Ok(Value::binary(buf, self.span)))
1058 }
1059 Err((_, err)) => {
1060 self.error = true;
1061 Some(Err(err))
1062 }
1063 }
1064 }
1065 }
1066 }
1067 }
1068}
1069
1070fn trim_end_newline(string: &mut String) {
1071 if string.ends_with('\n') {
1072 string.pop();
1073 if string.ends_with('\r') {
1074 string.pop();
1075 }
1076 }
1077}
1078
1079#[cfg(unix)]
1080pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1081 file.into().into()
1082}
1083
1084#[cfg(windows)]
1085pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1086 file.into().into()
1087}
1088
1089const DEFAULT_BUF_SIZE: usize = 8192;
1090
1091pub fn copy_with_signals(
1092 mut reader: impl Read,
1093 mut writer: impl Write,
1094 span: Span,
1095 signals: &Signals,
1096) -> Result<u64, ShellError> {
1097 let from_io_error = IoError::factory(span, None);
1098 if signals.is_empty() {
1099 match io::copy(&mut reader, &mut writer) {
1100 Ok(n) => {
1101 writer.flush().map_err(&from_io_error)?;
1102 Ok(n)
1103 }
1104 Err(err) => {
1105 let _ = writer.flush();
1106 match ShellErrorBridge::try_from(err) {
1107 Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1108 Err(err) => Err(from_io_error(err).into()),
1109 }
1110 }
1111 }
1112 } else {
1113 match generic_copy(&mut reader, &mut writer, span, signals) {
1118 Ok(len) => {
1119 writer.flush().map_err(&from_io_error)?;
1120 Ok(len)
1121 }
1122 Err(err) => {
1123 let _ = writer.flush();
1124 Err(err)
1125 }
1126 }
1127 }
1128}
1129
1130fn generic_copy(
1132 mut reader: impl Read,
1133 mut writer: impl Write,
1134 span: Span,
1135 signals: &Signals,
1136) -> Result<u64, ShellError> {
1137 let from_io_error = IoError::factory(span, None);
1138 let buf = &mut [0; DEFAULT_BUF_SIZE];
1139 let mut len = 0;
1140 loop {
1141 signals.check(&span)?;
1142 let n = match reader.read(buf) {
1143 Ok(0) => break,
1144 Ok(n) => n,
1145 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1146 Err(e) => match ShellErrorBridge::try_from(e) {
1147 Ok(ShellErrorBridge(e)) => return Err(e),
1148 Err(e) => return Err(from_io_error(e).into()),
1149 },
1150 };
1151 len += n;
1152 writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1153 }
1154 Ok(len as u64)
1155}
1156
1157struct ReadGenerator<F>
1158where
1159 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1160{
1161 buffer: Cursor<Vec<u8>>,
1162 generator: F,
1163}
1164
1165impl<F> BufRead for ReadGenerator<F>
1166where
1167 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1168{
1169 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1170 while self.buffer.fill_buf()?.is_empty() {
1173 self.buffer.set_position(0);
1175 self.buffer.get_mut().clear();
1176 if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1178 break;
1180 }
1181 }
1182 self.buffer.fill_buf()
1183 }
1184
1185 fn consume(&mut self, amt: usize) {
1186 self.buffer.consume(amt);
1187 }
1188}
1189
1190impl<F> Read for ReadGenerator<F>
1191where
1192 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1193{
1194 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1195 let slice = self.fill_buf()?;
1197 let len = buf.len().min(slice.len());
1198 buf[..len].copy_from_slice(&slice[..len]);
1199 self.consume(len);
1200 Ok(len)
1201 }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207
1208 fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1209 where
1210 T: AsRef<[u8]> + Default + Send + 'static,
1211 {
1212 let reader = ReadIterator {
1213 iter: data.into_iter(),
1214 cursor: Some(Cursor::new(T::default())),
1215 };
1216 Chunks::new(
1217 SourceReader::Read(Box::new(reader)),
1218 Span::test_data(),
1219 Signals::empty(),
1220 type_,
1221 )
1222 }
1223
1224 #[test]
1225 fn chunks_read_binary_passthrough() {
1226 let bins = vec![&[0, 1][..], &[2, 3][..]];
1227 let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1228
1229 let bins_values: Vec<Value> = bins
1230 .into_iter()
1231 .map(|bin| Value::binary(bin, Span::test_data()))
1232 .collect();
1233 assert_eq!(
1234 bins_values,
1235 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1236 );
1237 }
1238
1239 #[test]
1240 fn chunks_read_string_clean() {
1241 let strs = vec!["Nushell", "が好きです"];
1242 let iter = test_chunks(strs.clone(), ByteStreamType::String);
1243
1244 let strs_values: Vec<Value> = strs
1245 .into_iter()
1246 .map(|string| Value::string(string, Span::test_data()))
1247 .collect();
1248 assert_eq!(
1249 strs_values,
1250 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1251 );
1252 }
1253
1254 #[test]
1255 fn chunks_read_string_split_boundary() {
1256 let real = "Nushell最高!";
1257 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1258 let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1259
1260 let mut string = String::new();
1261 for value in iter {
1262 let chunk_string = value.expect("error").into_string().expect("not a string");
1263 string.push_str(&chunk_string);
1264 }
1265 assert_eq!(real, string);
1266 }
1267
1268 #[test]
1269 fn chunks_read_string_utf8_error() {
1270 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1271 let iter = test_chunks(chunks, ByteStreamType::String);
1272
1273 let mut string = String::new();
1274 for value in iter {
1275 match value {
1276 Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1277 Err(err) => {
1278 println!("string so far: {string:?}");
1279 println!("got error: {err:?}");
1280 assert!(!string.is_empty());
1281 assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1282 return;
1283 }
1284 }
1285 }
1286 panic!("no error");
1287 }
1288
1289 #[test]
1290 fn chunks_read_unknown_fallback() {
1291 let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1292 let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1293
1294 let mut get = || iter.next().expect("end of iter").expect("error");
1295
1296 assert_eq!(Value::test_string("Nushell"), get());
1297 assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1298 assert_eq!(Value::test_binary(b"efgh"), get());
1300 }
1301}