1#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8 IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9 shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use nu_utils::SplitRead as SplitReadInner;
12use serde::{Deserialize, Serialize};
13use std::ops::Bound;
14#[cfg(unix)]
15use std::os::fd::OwnedFd;
16#[cfg(windows)]
17use std::os::windows::io::OwnedHandle;
18use std::{
19 fmt::Debug,
20 fs::File,
21 io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
22 process::Stdio,
23};
24
25pub enum ByteStreamSource {
32 Read(Box<dyn Read + Send + 'static>),
33 File(File),
34 #[cfg(feature = "os")]
35 Child(Box<ChildProcess>),
36}
37
38impl ByteStreamSource {
39 fn reader(self) -> Option<SourceReader> {
40 match self {
41 ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
42 ByteStreamSource::File(file) => Some(SourceReader::File(file)),
43 #[cfg(feature = "os")]
44 ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
45 ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
46 ChildPipe::Tee(tee) => SourceReader::Read(tee),
47 }),
48 }
49 }
50
51 #[cfg(feature = "os")]
53 pub fn is_external(&self) -> bool {
54 matches!(self, ByteStreamSource::Child(..))
55 }
56
57 #[cfg(not(feature = "os"))]
58 pub fn is_external(&self) -> bool {
59 false
61 }
62}
63
64impl Debug for ByteStreamSource {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 match self {
67 ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
68 ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
69 #[cfg(feature = "os")]
70 ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
71 }
72 }
73}
74
75enum SourceReader {
76 Read(Box<dyn Read + Send + 'static>),
77 File(File),
78}
79
80impl Read for SourceReader {
81 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82 match self {
83 SourceReader::Read(reader) => reader.read(buf),
84 SourceReader::File(file) => file.read(buf),
85 }
86 }
87}
88
89impl Debug for SourceReader {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
93 SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
94 }
95 }
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
100pub enum ByteStreamType {
101 Binary,
104 String,
110 #[default]
113 Unknown,
114}
115
116impl ByteStreamType {
117 pub fn describe(self) -> &'static str {
120 match self {
121 ByteStreamType::Binary => "binary (stream)",
122 ByteStreamType::String => "string (stream)",
123 ByteStreamType::Unknown => "byte stream",
124 }
125 }
126
127 pub fn is_binary_coercible(self) -> bool {
129 matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
130 }
131
132 pub fn is_string_coercible(self) -> bool {
134 matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
135 }
136}
137
138impl From<ByteStreamType> for Type {
139 fn from(value: ByteStreamType) -> Self {
140 match value {
141 ByteStreamType::Binary => Type::Binary,
142 ByteStreamType::String => Type::String,
143 ByteStreamType::Unknown => Type::Any,
144 }
145 }
146}
147
148#[derive(Debug)]
191pub struct ByteStream {
192 stream: ByteStreamSource,
193 span: Span,
194 signals: Signals,
195 type_: ByteStreamType,
196 known_size: Option<u64>,
197 caller_spans: Vec<Span>,
198}
199
200impl ByteStream {
201 pub fn new(
203 stream: ByteStreamSource,
204 span: Span,
205 signals: Signals,
206 type_: ByteStreamType,
207 ) -> Self {
208 Self {
209 stream,
210 span,
211 signals,
212 type_,
213 known_size: None,
214 caller_spans: vec![],
215 }
216 }
217
218 pub fn push_caller_span(&mut self, span: Span) {
220 if span != self.span {
221 self.caller_spans.push(span)
222 }
223 }
224
225 pub fn get_caller_spans(&self) -> &Vec<Span> {
227 &self.caller_spans
228 }
229
230 pub fn read(
232 reader: impl Read + Send + 'static,
233 span: Span,
234 signals: Signals,
235 type_: ByteStreamType,
236 ) -> Self {
237 Self::new(
238 ByteStreamSource::Read(Box::new(reader)),
239 span,
240 signals,
241 type_,
242 )
243 }
244
245 pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
246 let known_size = self.known_size.map(|len| len.saturating_sub(n));
247 if let Some(mut reader) = self.reader() {
248 io::copy(&mut (&mut reader).take(n), &mut io::sink())
250 .map_err(|err| IoError::new(err, span, None))?;
251 Ok(
252 ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
253 .with_known_size(known_size),
254 )
255 } else {
256 Err(ShellError::TypeMismatch {
257 err_message: "expected readable stream".into(),
258 span,
259 })
260 }
261 }
262
263 pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
264 let known_size = self.known_size.map(|s| s.min(n));
265 if let Some(reader) = self.reader() {
266 Ok(ByteStream::read(
267 reader.take(n),
268 span,
269 Signals::empty(),
270 ByteStreamType::Binary,
271 )
272 .with_known_size(known_size))
273 } else {
274 Err(ShellError::TypeMismatch {
275 err_message: "expected readable stream".into(),
276 span,
277 })
278 }
279 }
280
281 pub fn slice(
282 self,
283 val_span: Span,
284 call_span: Span,
285 range: IntRange,
286 ) -> Result<Self, ShellError> {
287 if let Some(len) = self.known_size {
288 let start = range.absolute_start(len);
289 let stream = self.skip(val_span, start);
290
291 match range.absolute_end(len) {
292 Bound::Unbounded => stream,
293 Bound::Included(end) | Bound::Excluded(end) if end < start => {
294 stream.and_then(|s| s.take(val_span, 0))
295 }
296 Bound::Included(end) => {
297 let distance = end - start + 1;
298 stream.and_then(|s| s.take(val_span, distance.min(len)))
299 }
300 Bound::Excluded(end) => {
301 let distance = end - start;
302 stream.and_then(|s| s.take(val_span, distance.min(len)))
303 }
304 }
305 } else if range.is_relative() {
306 Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
307 } else {
308 let start = range.start() as u64;
309 let stream = self.skip(val_span, start);
310
311 match range.distance() {
312 Bound::Unbounded => stream,
313 Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
314 Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
315 }
316 }
317 }
318
319 pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
321 let len = string.len();
322 ByteStream::read(
323 Cursor::new(string.into_bytes()),
324 span,
325 signals,
326 ByteStreamType::String,
327 )
328 .with_known_size(Some(len as u64))
329 }
330
331 pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
333 let len = bytes.len();
334 ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
335 .with_known_size(Some(len as u64))
336 }
337
338 pub fn file(file: File, span: Span, signals: Signals) -> Self {
343 Self::new(
344 ByteStreamSource::File(file),
345 span,
346 signals,
347 ByteStreamType::Unknown,
348 )
349 }
350
351 #[cfg(feature = "os")]
356 pub fn child(child: ChildProcess, span: Span) -> Self {
357 Self::new(
358 ByteStreamSource::Child(Box::new(child)),
359 span,
360 Signals::empty(),
361 ByteStreamType::Unknown,
362 )
363 }
364
365 #[cfg(feature = "os")]
370 pub fn stdin(span: Span) -> Result<Self, ShellError> {
371 let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
372 let source = ByteStreamSource::File(convert_file(stdin));
373 Ok(Self::new(
374 source,
375 span,
376 Signals::empty(),
377 ByteStreamType::Unknown,
378 ))
379 }
380
381 #[cfg(not(feature = "os"))]
382 pub fn stdin(span: Span) -> Result<Self, ShellError> {
383 Err(ShellError::DisabledOsSupport {
384 msg: "Stdin is not supported".to_string(),
385 span,
386 })
387 }
388
389 pub fn from_fn(
392 span: Span,
393 signals: Signals,
394 type_: ByteStreamType,
395 generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
396 ) -> Self {
397 Self::read(
398 ReadGenerator {
399 buffer: Cursor::new(Vec::new()),
400 generator,
401 },
402 span,
403 signals,
404 type_,
405 )
406 }
407
408 pub fn with_type(mut self, type_: ByteStreamType) -> Self {
409 self.type_ = type_;
410 self
411 }
412
413 pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
417 where
418 I: IntoIterator,
419 I::IntoIter: Send + 'static,
420 I::Item: AsRef<[u8]> + Default + Send + 'static,
421 {
422 let iter = iter.into_iter();
423 let cursor = Some(Cursor::new(I::Item::default()));
424 Self::read(ReadIterator { iter, cursor }, span, signals, type_)
425 }
426
427 pub fn from_result_iter<I, T>(
431 iter: I,
432 span: Span,
433 signals: Signals,
434 type_: ByteStreamType,
435 ) -> Self
436 where
437 I: IntoIterator<Item = Result<T, ShellError>>,
438 I::IntoIter: Send + 'static,
439 T: AsRef<[u8]> + Default + Send + 'static,
440 {
441 let iter = iter.into_iter();
442 let cursor = Some(Cursor::new(T::default()));
443 Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
444 }
445
446 pub fn with_known_size(mut self, size: Option<u64>) -> Self {
448 self.known_size = size;
449 self
450 }
451
452 pub fn source(&self) -> &ByteStreamSource {
454 &self.stream
455 }
456
457 pub fn source_mut(&mut self) -> &mut ByteStreamSource {
459 &mut self.stream
460 }
461
462 pub fn span(&self) -> Span {
464 self.span
465 }
466
467 pub fn with_span(mut self, span: Span) -> Self {
469 self.span = span;
470 self
471 }
472
473 pub fn type_(&self) -> ByteStreamType {
475 self.type_
476 }
477
478 pub fn known_size(&self) -> Option<u64> {
480 self.known_size
481 }
482
483 pub fn reader(self) -> Option<Reader> {
490 let reader = self.stream.reader()?;
491 Some(Reader {
492 reader: BufReader::new(reader),
493 span: self.span,
494 signals: self.signals,
495 })
496 }
497
498 pub fn lines(self) -> Option<Lines> {
506 let reader = self.stream.reader()?;
507 Some(Lines {
508 reader: BufReader::new(reader),
509 span: self.span,
510 signals: self.signals,
511 })
512 }
513
514 pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
522 let reader = self.stream.reader()?;
523 Some(SplitRead::new(reader, delimiter, self.span, self.signals))
524 }
525
526 pub fn chunks(self) -> Option<Chunks> {
542 let reader = self.stream.reader()?;
543 Some(Chunks::new(reader, self.span, self.signals, self.type_))
544 }
545
546 pub fn into_source(self) -> ByteStreamSource {
548 self.stream
549 }
550
551 pub fn into_stdio(mut self) -> Result<Stdio, Self> {
559 match self.stream {
560 ByteStreamSource::Read(..) => Err(self),
561 ByteStreamSource::File(file) => Ok(file.into()),
562 #[cfg(feature = "os")]
563 ByteStreamSource::Child(child) => {
564 if let ChildProcess {
565 stdout: Some(ChildPipe::Pipe(stdout)),
566 stderr,
567 ..
568 } = *child
569 {
570 debug_assert!(stderr.is_none(), "stderr should not exist");
571 Ok(stdout.into())
572 } else {
573 self.stream = ByteStreamSource::Child(child);
574 Err(self)
575 }
576 }
577 }
578 }
579
580 #[cfg(feature = "os")]
585 pub fn into_child(self) -> Result<ChildProcess, Self> {
586 if let ByteStreamSource::Child(child) = self.stream {
587 Ok(*child)
588 } else {
589 Err(self)
590 }
591 }
592
593 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
597 let from_io_error = IoError::factory(self.span, None);
599 match self.stream {
600 ByteStreamSource::Read(mut read) => {
601 let mut buf = Vec::new();
602 read.read_to_end(&mut buf).map_err(|err| {
603 match ShellErrorBridge::try_from(err) {
604 Ok(ShellErrorBridge(err)) => err,
605 Err(err) => ShellError::Io(from_io_error(err)),
606 }
607 })?;
608 Ok(buf)
609 }
610 ByteStreamSource::File(mut file) => {
611 let mut buf = Vec::new();
612 file.read_to_end(&mut buf).map_err(&from_io_error)?;
613 Ok(buf)
614 }
615 #[cfg(feature = "os")]
616 ByteStreamSource::Child(child) => child.into_bytes(),
617 }
618 }
619
620 pub fn into_string(self) -> Result<String, ShellError> {
629 let span = self.span;
630 if self.type_.is_string_coercible() {
631 let trim = self.stream.is_external();
632 let bytes = self.into_bytes()?;
633 let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
634 span,
635 msg: err.to_string(),
636 })?;
637 if trim {
638 trim_end_newline(&mut string);
639 }
640 Ok(string)
641 } else {
642 Err(ShellError::TypeMismatch {
643 err_message: "expected string, but got binary".into(),
644 span,
645 })
646 }
647 }
648
649 pub fn into_value(self) -> Result<Value, ShellError> {
662 let span = self.span;
663 let trim = self.stream.is_external();
664 let value = match self.type_ {
665 ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
667 ByteStreamType::String => Value::string(self.into_string()?, span),
668 ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
670 Ok(mut str) => {
671 if trim {
672 trim_end_newline(&mut str);
673 }
674 Value::string(str, span)
675 }
676 Err(err) => Value::binary(err.into_bytes(), span),
677 },
678 };
679 Ok(value)
680 }
681
682 pub fn drain(self) -> Result<(), ShellError> {
684 match self.stream {
685 ByteStreamSource::Read(read) => {
686 copy_with_signals(read, io::sink(), self.span, &self.signals)?;
687 Ok(())
688 }
689 ByteStreamSource::File(_) => Ok(()),
690 #[cfg(feature = "os")]
691 ByteStreamSource::Child(child) => child.wait(),
692 }
693 }
694
695 pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
697 if to_stderr {
698 self.write_to(&mut io::stderr())
699 } else {
700 self.write_to(&mut io::stdout())
701 }
702 }
703
704 pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
706 let span = self.span;
707 let signals = &self.signals;
708 match self.stream {
709 ByteStreamSource::Read(read) => {
710 copy_with_signals(read, dest, span, signals)?;
711 }
712 ByteStreamSource::File(file) => {
713 copy_with_signals(file, dest, span, signals)?;
714 }
715 #[cfg(feature = "os")]
716 ByteStreamSource::Child(mut child) => {
717 debug_assert!(child.stderr.is_none(), "stderr should not exist");
721
722 if let Some(stdout) = child.stdout.take() {
723 match stdout {
724 ChildPipe::Pipe(pipe) => {
725 copy_with_signals(pipe, dest, span, signals)?;
726 }
727 ChildPipe::Tee(tee) => {
728 copy_with_signals(tee, dest, span, signals)?;
729 }
730 }
731 }
732 child.wait()?;
733 }
734 }
735 Ok(())
736 }
737}
738
739impl From<ByteStream> for PipelineData {
740 fn from(stream: ByteStream) -> Self {
741 Self::byte_stream(stream, None)
742 }
743}
744
745struct ReadIterator<I>
746where
747 I: Iterator,
748 I::Item: AsRef<[u8]>,
749{
750 iter: I,
751 cursor: Option<Cursor<I::Item>>,
752}
753
754impl<I> Read for ReadIterator<I>
755where
756 I: Iterator,
757 I::Item: AsRef<[u8]>,
758{
759 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
760 while let Some(cursor) = self.cursor.as_mut() {
761 let read = cursor.read(buf)?;
762 if read == 0 {
763 self.cursor = self.iter.next().map(Cursor::new);
764 } else {
765 return Ok(read);
766 }
767 }
768 Ok(0)
769 }
770}
771
772struct ReadResultIterator<I, T>
773where
774 I: Iterator<Item = Result<T, ShellError>>,
775 T: AsRef<[u8]>,
776{
777 iter: I,
778 cursor: Option<Cursor<T>>,
779}
780
781impl<I, T> Read for ReadResultIterator<I, T>
782where
783 I: Iterator<Item = Result<T, ShellError>>,
784 T: AsRef<[u8]>,
785{
786 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
787 while let Some(cursor) = self.cursor.as_mut() {
788 let read = cursor.read(buf)?;
789 if read == 0 {
790 self.cursor = self
791 .iter
792 .next()
793 .transpose()
794 .map_err(ShellErrorBridge)?
795 .map(Cursor::new);
796 } else {
797 return Ok(read);
798 }
799 }
800 Ok(0)
801 }
802}
803
804pub struct Reader {
805 reader: BufReader<SourceReader>,
806 span: Span,
807 signals: Signals,
808}
809
810impl Reader {
811 pub fn span(&self) -> Span {
812 self.span
813 }
814}
815
816impl Read for Reader {
817 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
818 self.signals.check(&self.span).map_err(ShellErrorBridge)?;
819 self.reader.read(buf)
820 }
821}
822
823impl BufRead for Reader {
824 fn fill_buf(&mut self) -> io::Result<&[u8]> {
825 self.reader.fill_buf()
826 }
827
828 fn consume(&mut self, amt: usize) {
829 self.reader.consume(amt)
830 }
831}
832
833pub struct Lines {
834 reader: BufReader<SourceReader>,
835 span: Span,
836 signals: Signals,
837}
838
839impl Lines {
840 pub fn span(&self) -> Span {
841 self.span
842 }
843}
844
845impl Iterator for Lines {
846 type Item = Result<String, ShellError>;
847
848 fn next(&mut self) -> Option<Self::Item> {
849 if self.signals.interrupted() {
850 None
851 } else {
852 let mut buf = Vec::new();
853 match self.reader.read_until(b'\n', &mut buf) {
854 Ok(0) => None,
855 Ok(_) => {
856 let Ok(mut string) = String::from_utf8(buf) else {
857 return Some(Err(ShellError::NonUtf8 { span: self.span }));
858 };
859 trim_end_newline(&mut string);
860 Some(Ok(string))
861 }
862 Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
863 }
864 }
865 }
866}
867
868pub struct SplitRead {
869 internal: SplitReadInner<BufReader<SourceReader>>,
870 span: Span,
871 signals: Signals,
872}
873
874impl SplitRead {
875 fn new(
876 reader: SourceReader,
877 delimiter: impl AsRef<[u8]>,
878 span: Span,
879 signals: Signals,
880 ) -> Self {
881 Self {
882 internal: SplitReadInner::new(BufReader::new(reader), delimiter),
883 span,
884 signals,
885 }
886 }
887
888 pub fn span(&self) -> Span {
889 self.span
890 }
891}
892
893impl Iterator for SplitRead {
894 type Item = Result<Vec<u8>, ShellError>;
895
896 fn next(&mut self) -> Option<Self::Item> {
897 if self.signals.interrupted() {
898 return None;
899 }
900 self.internal.next().map(|r| {
901 r.map_err(|err| {
902 ShellError::Io(IoError::new_internal(
903 err,
904 "Could not get next value for SplitRead",
905 ))
906 })
907 })
908 }
909}
910
911pub struct Chunks {
917 reader: BufReader<SourceReader>,
918 pos: u64,
919 error: bool,
920 span: Span,
921 signals: Signals,
922 type_: ByteStreamType,
923}
924
925impl Chunks {
926 fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
927 Self {
928 reader: BufReader::new(reader),
929 pos: 0,
930 error: false,
931 span,
932 signals,
933 type_,
934 }
935 }
936
937 pub fn span(&self) -> Span {
938 self.span
939 }
940
941 fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
942 let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
943 Ok(err) => err.0,
944 Err(err) => IoError::new(err, self.span, None).into(),
945 };
946
947 let buf = self
949 .reader
950 .fill_buf()
951 .map_err(from_io_error)
952 .map_err(|err| (vec![], err))?;
953
954 if buf.is_empty() {
956 return Ok(None);
957 }
958
959 let mut buf = buf.to_vec();
960 let mut consumed = 0;
961
962 if buf.len() < 4 {
964 consumed += buf.len();
965 self.reader.consume(buf.len());
966 match self.reader.fill_buf() {
967 Ok(more_bytes) => buf.extend_from_slice(more_bytes),
968 Err(err) => return Err((buf, from_io_error(err))),
969 }
970 }
971
972 match String::from_utf8(buf) {
974 Ok(string) => {
975 self.reader.consume(string.len() - consumed);
976 self.pos += string.len() as u64;
977 Ok(Some(string))
978 }
979 Err(err) if err.utf8_error().error_len().is_none() => {
980 let valid_up_to = err.utf8_error().valid_up_to();
983 if valid_up_to > consumed {
984 self.reader.consume(valid_up_to - consumed);
985 }
986 let mut buf = err.into_bytes();
987 buf.truncate(valid_up_to);
988 buf.shrink_to_fit();
989 let string = String::from_utf8(buf)
990 .expect("failed to parse utf-8 even after correcting error");
991 self.pos += string.len() as u64;
992 Ok(Some(string))
993 }
994 Err(err) => {
995 let shell_error = ShellError::NonUtf8Custom {
997 msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
998 span: self.span,
999 };
1000 let buf = err.into_bytes();
1001 if buf.len() > consumed {
1004 self.reader.consume(buf.len() - consumed);
1005 }
1006 self.pos += buf.len() as u64;
1007 Err((buf, shell_error))
1008 }
1009 }
1010 }
1011}
1012
1013impl Iterator for Chunks {
1014 type Item = Result<Value, ShellError>;
1015
1016 fn next(&mut self) -> Option<Self::Item> {
1017 if self.error || self.signals.interrupted() {
1018 None
1019 } else {
1020 match self.type_ {
1021 ByteStreamType::Binary => {
1023 let buf = match self.reader.fill_buf() {
1024 Ok(buf) => buf,
1025 Err(err) => {
1026 self.error = true;
1027 return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1028 }
1029 };
1030 if !buf.is_empty() {
1031 let len = buf.len();
1032 let value = Value::binary(buf, self.span);
1033 self.reader.consume(len);
1034 self.pos += len as u64;
1035 Some(Ok(value))
1036 } else {
1037 None
1038 }
1039 }
1040 ByteStreamType::String => match self.next_string().transpose()? {
1042 Ok(string) => Some(Ok(Value::string(string, self.span))),
1043 Err((_, err)) => {
1044 self.error = true;
1045 Some(Err(err))
1046 }
1047 },
1048 ByteStreamType::Unknown => {
1051 match self.next_string().transpose()? {
1052 Ok(string) => Some(Ok(Value::string(string, self.span))),
1053 Err((buf, _)) if !buf.is_empty() => {
1054 self.type_ = ByteStreamType::Binary;
1056 Some(Ok(Value::binary(buf, self.span)))
1057 }
1058 Err((_, err)) => {
1059 self.error = true;
1060 Some(Err(err))
1061 }
1062 }
1063 }
1064 }
1065 }
1066 }
1067}
1068
1069fn trim_end_newline(string: &mut String) {
1070 if string.ends_with('\n') {
1071 string.pop();
1072 if string.ends_with('\r') {
1073 string.pop();
1074 }
1075 }
1076}
1077
1078#[cfg(unix)]
1079pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1080 file.into().into()
1081}
1082
1083#[cfg(windows)]
1084pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1085 file.into().into()
1086}
1087
1088const DEFAULT_BUF_SIZE: usize = 8192;
1089
1090pub fn copy_with_signals(
1091 mut reader: impl Read,
1092 mut writer: impl Write,
1093 span: Span,
1094 signals: &Signals,
1095) -> Result<u64, ShellError> {
1096 let from_io_error = IoError::factory(span, None);
1097 if signals.is_empty() {
1098 match io::copy(&mut reader, &mut writer) {
1099 Ok(n) => {
1100 writer.flush().map_err(&from_io_error)?;
1101 Ok(n)
1102 }
1103 Err(err) => {
1104 let _ = writer.flush();
1105 match ShellErrorBridge::try_from(err) {
1106 Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1107 Err(err) => Err(from_io_error(err).into()),
1108 }
1109 }
1110 }
1111 } else {
1112 match generic_copy(&mut reader, &mut writer, span, signals) {
1117 Ok(len) => {
1118 writer.flush().map_err(&from_io_error)?;
1119 Ok(len)
1120 }
1121 Err(err) => {
1122 let _ = writer.flush();
1123 Err(err)
1124 }
1125 }
1126 }
1127}
1128
1129fn generic_copy(
1131 mut reader: impl Read,
1132 mut writer: impl Write,
1133 span: Span,
1134 signals: &Signals,
1135) -> Result<u64, ShellError> {
1136 let from_io_error = IoError::factory(span, None);
1137 let buf = &mut [0; DEFAULT_BUF_SIZE];
1138 let mut len = 0;
1139 loop {
1140 signals.check(&span)?;
1141 let n = match reader.read(buf) {
1142 Ok(0) => break,
1143 Ok(n) => n,
1144 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1145 Err(e) => match ShellErrorBridge::try_from(e) {
1146 Ok(ShellErrorBridge(e)) => return Err(e),
1147 Err(e) => return Err(from_io_error(e).into()),
1148 },
1149 };
1150 len += n;
1151 writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1152 }
1153 Ok(len as u64)
1154}
1155
1156struct ReadGenerator<F>
1157where
1158 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1159{
1160 buffer: Cursor<Vec<u8>>,
1161 generator: F,
1162}
1163
1164impl<F> BufRead for ReadGenerator<F>
1165where
1166 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1167{
1168 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1169 while self.buffer.fill_buf()?.is_empty() {
1172 self.buffer.set_position(0);
1174 self.buffer.get_mut().clear();
1175 if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1177 break;
1179 }
1180 }
1181 self.buffer.fill_buf()
1182 }
1183
1184 fn consume(&mut self, amt: usize) {
1185 self.buffer.consume(amt);
1186 }
1187}
1188
1189impl<F> Read for ReadGenerator<F>
1190where
1191 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1192{
1193 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1194 let slice = self.fill_buf()?;
1196 let len = buf.len().min(slice.len());
1197 buf[..len].copy_from_slice(&slice[..len]);
1198 self.consume(len);
1199 Ok(len)
1200 }
1201}
1202
1203#[cfg(test)]
1204mod tests {
1205 use super::*;
1206
1207 fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1208 where
1209 T: AsRef<[u8]> + Default + Send + 'static,
1210 {
1211 let reader = ReadIterator {
1212 iter: data.into_iter(),
1213 cursor: Some(Cursor::new(T::default())),
1214 };
1215 Chunks::new(
1216 SourceReader::Read(Box::new(reader)),
1217 Span::test_data(),
1218 Signals::empty(),
1219 type_,
1220 )
1221 }
1222
1223 #[test]
1224 fn chunks_read_binary_passthrough() {
1225 let bins = vec![&[0, 1][..], &[2, 3][..]];
1226 let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1227
1228 let bins_values: Vec<Value> = bins
1229 .into_iter()
1230 .map(|bin| Value::binary(bin, Span::test_data()))
1231 .collect();
1232 assert_eq!(
1233 bins_values,
1234 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1235 );
1236 }
1237
1238 #[test]
1239 fn chunks_read_string_clean() {
1240 let strs = vec!["Nushell", "が好きです"];
1241 let iter = test_chunks(strs.clone(), ByteStreamType::String);
1242
1243 let strs_values: Vec<Value> = strs
1244 .into_iter()
1245 .map(|string| Value::string(string, Span::test_data()))
1246 .collect();
1247 assert_eq!(
1248 strs_values,
1249 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1250 );
1251 }
1252
1253 #[test]
1254 fn chunks_read_string_split_boundary() {
1255 let real = "Nushell最高!";
1256 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1257 let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1258
1259 let mut string = String::new();
1260 for value in iter {
1261 let chunk_string = value.expect("error").into_string().expect("not a string");
1262 string.push_str(&chunk_string);
1263 }
1264 assert_eq!(real, string);
1265 }
1266
1267 #[test]
1268 fn chunks_read_string_utf8_error() {
1269 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1270 let iter = test_chunks(chunks, ByteStreamType::String);
1271
1272 let mut string = String::new();
1273 for value in iter {
1274 match value {
1275 Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1276 Err(err) => {
1277 println!("string so far: {string:?}");
1278 println!("got error: {err:?}");
1279 assert!(!string.is_empty());
1280 assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1281 return;
1282 }
1283 }
1284 }
1285 panic!("no error");
1286 }
1287
1288 #[test]
1289 fn chunks_read_unknown_fallback() {
1290 let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1291 let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1292
1293 let mut get = || iter.next().expect("end of iter").expect("error");
1294
1295 assert_eq!(Value::test_string("Nushell"), get());
1296 assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1297 assert_eq!(Value::test_binary(b"efgh"), get());
1299 }
1300}