1#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8 IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
9 shell_error::{bridge::ShellErrorBridge, io::IoError},
10};
11use serde::{Deserialize, Serialize};
12use std::ops::Bound;
13#[cfg(unix)]
14use std::os::fd::OwnedFd;
15#[cfg(windows)]
16use std::os::windows::io::OwnedHandle;
17use std::{
18 fmt::Debug,
19 fs::File,
20 io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
21 process::Stdio,
22};
23
24pub enum ByteStreamSource {
31 Read(Box<dyn Read + Send + 'static>),
32 File(File),
33 #[cfg(feature = "os")]
34 Child(Box<ChildProcess>),
35}
36
37impl ByteStreamSource {
38 fn reader(self) -> Option<SourceReader> {
39 match self {
40 ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
41 ByteStreamSource::File(file) => Some(SourceReader::File(file)),
42 #[cfg(feature = "os")]
43 ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
44 ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
45 ChildPipe::Tee(tee) => SourceReader::Read(tee),
46 }),
47 }
48 }
49
50 #[cfg(feature = "os")]
52 pub fn is_external(&self) -> bool {
53 matches!(self, ByteStreamSource::Child(..))
54 }
55
56 #[cfg(not(feature = "os"))]
57 pub fn is_external(&self) -> bool {
58 false
60 }
61}
62
63impl Debug for ByteStreamSource {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
67 ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
68 #[cfg(feature = "os")]
69 ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
70 }
71 }
72}
73
74enum SourceReader {
75 Read(Box<dyn Read + Send + 'static>),
76 File(File),
77}
78
79impl Read for SourceReader {
80 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
81 match self {
82 SourceReader::Read(reader) => reader.read(buf),
83 SourceReader::File(file) => file.read(buf),
84 }
85 }
86}
87
88impl Debug for SourceReader {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
92 SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
93 }
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
99pub enum ByteStreamType {
100 Binary,
103 String,
109 #[default]
112 Unknown,
113}
114
115impl ByteStreamType {
116 pub fn describe(self) -> &'static str {
119 match self {
120 ByteStreamType::Binary => "binary (stream)",
121 ByteStreamType::String => "string (stream)",
122 ByteStreamType::Unknown => "byte stream",
123 }
124 }
125
126 pub fn is_binary_coercible(self) -> bool {
128 matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
129 }
130
131 pub fn is_string_coercible(self) -> bool {
133 matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
134 }
135}
136
137impl From<ByteStreamType> for Type {
138 fn from(value: ByteStreamType) -> Self {
139 match value {
140 ByteStreamType::Binary => Type::Binary,
141 ByteStreamType::String => Type::String,
142 ByteStreamType::Unknown => Type::Any,
143 }
144 }
145}
146
147#[derive(Debug)]
190pub struct ByteStream {
191 stream: ByteStreamSource,
192 span: Span,
193 signals: Signals,
194 type_: ByteStreamType,
195 known_size: Option<u64>,
196 caller_spans: Vec<Span>,
197}
198
199impl ByteStream {
200 pub fn new(
202 stream: ByteStreamSource,
203 span: Span,
204 signals: Signals,
205 type_: ByteStreamType,
206 ) -> Self {
207 Self {
208 stream,
209 span,
210 signals,
211 type_,
212 known_size: None,
213 caller_spans: vec![],
214 }
215 }
216
217 pub fn push_caller_span(&mut self, span: Span) {
219 if span != self.span {
220 self.caller_spans.push(span)
221 }
222 }
223
224 pub fn get_caller_spans(&self) -> &Vec<Span> {
226 &self.caller_spans
227 }
228
229 pub fn read(
231 reader: impl Read + Send + 'static,
232 span: Span,
233 signals: Signals,
234 type_: ByteStreamType,
235 ) -> Self {
236 Self::new(
237 ByteStreamSource::Read(Box::new(reader)),
238 span,
239 signals,
240 type_,
241 )
242 }
243
244 pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
245 let known_size = self.known_size.map(|len| len.saturating_sub(n));
246 if let Some(mut reader) = self.reader() {
247 io::copy(&mut (&mut reader).take(n), &mut io::sink())
249 .map_err(|err| IoError::new(err, span, None))?;
250 Ok(
251 ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
252 .with_known_size(known_size),
253 )
254 } else {
255 Err(ShellError::TypeMismatch {
256 err_message: "expected readable stream".into(),
257 span,
258 })
259 }
260 }
261
262 pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
263 let known_size = self.known_size.map(|s| s.min(n));
264 if let Some(reader) = self.reader() {
265 Ok(ByteStream::read(
266 reader.take(n),
267 span,
268 Signals::empty(),
269 ByteStreamType::Binary,
270 )
271 .with_known_size(known_size))
272 } else {
273 Err(ShellError::TypeMismatch {
274 err_message: "expected readable stream".into(),
275 span,
276 })
277 }
278 }
279
280 pub fn slice(
281 self,
282 val_span: Span,
283 call_span: Span,
284 range: IntRange,
285 ) -> Result<Self, ShellError> {
286 if let Some(len) = self.known_size {
287 let start = range.absolute_start(len);
288 let stream = self.skip(val_span, start);
289
290 match range.absolute_end(len) {
291 Bound::Unbounded => stream,
292 Bound::Included(end) | Bound::Excluded(end) if end < start => {
293 stream.and_then(|s| s.take(val_span, 0))
294 }
295 Bound::Included(end) => {
296 let distance = end - start + 1;
297 stream.and_then(|s| s.take(val_span, distance.min(len)))
298 }
299 Bound::Excluded(end) => {
300 let distance = end - start;
301 stream.and_then(|s| s.take(val_span, distance.min(len)))
302 }
303 }
304 } else if range.is_relative() {
305 Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
306 } else {
307 let start = range.start() as u64;
308 let stream = self.skip(val_span, start);
309
310 match range.distance() {
311 Bound::Unbounded => stream,
312 Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
313 Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
314 }
315 }
316 }
317
318 pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
320 let len = string.len();
321 ByteStream::read(
322 Cursor::new(string.into_bytes()),
323 span,
324 signals,
325 ByteStreamType::String,
326 )
327 .with_known_size(Some(len as u64))
328 }
329
330 pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
332 let len = bytes.len();
333 ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
334 .with_known_size(Some(len as u64))
335 }
336
337 pub fn file(file: File, span: Span, signals: Signals) -> Self {
342 Self::new(
343 ByteStreamSource::File(file),
344 span,
345 signals,
346 ByteStreamType::Unknown,
347 )
348 }
349
350 #[cfg(feature = "os")]
355 pub fn child(child: ChildProcess, span: Span) -> Self {
356 Self::new(
357 ByteStreamSource::Child(Box::new(child)),
358 span,
359 Signals::empty(),
360 ByteStreamType::Unknown,
361 )
362 }
363
364 #[cfg(feature = "os")]
369 pub fn stdin(span: Span) -> Result<Self, ShellError> {
370 let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
371 let source = ByteStreamSource::File(convert_file(stdin));
372 Ok(Self::new(
373 source,
374 span,
375 Signals::empty(),
376 ByteStreamType::Unknown,
377 ))
378 }
379
380 #[cfg(not(feature = "os"))]
381 pub fn stdin(span: Span) -> Result<Self, ShellError> {
382 Err(ShellError::DisabledOsSupport {
383 msg: "Stdin is not supported".to_string(),
384 span: Some(span),
385 })
386 }
387
388 pub fn from_fn(
391 span: Span,
392 signals: Signals,
393 type_: ByteStreamType,
394 generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
395 ) -> Self {
396 Self::read(
397 ReadGenerator {
398 buffer: Cursor::new(Vec::new()),
399 generator,
400 },
401 span,
402 signals,
403 type_,
404 )
405 }
406
407 pub fn with_type(mut self, type_: ByteStreamType) -> Self {
408 self.type_ = type_;
409 self
410 }
411
412 pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
416 where
417 I: IntoIterator,
418 I::IntoIter: Send + 'static,
419 I::Item: AsRef<[u8]> + Default + Send + 'static,
420 {
421 let iter = iter.into_iter();
422 let cursor = Some(Cursor::new(I::Item::default()));
423 Self::read(ReadIterator { iter, cursor }, span, signals, type_)
424 }
425
426 pub fn from_result_iter<I, T>(
430 iter: I,
431 span: Span,
432 signals: Signals,
433 type_: ByteStreamType,
434 ) -> Self
435 where
436 I: IntoIterator<Item = Result<T, ShellError>>,
437 I::IntoIter: Send + 'static,
438 T: AsRef<[u8]> + Default + Send + 'static,
439 {
440 let iter = iter.into_iter();
441 let cursor = Some(Cursor::new(T::default()));
442 Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
443 }
444
445 pub fn with_known_size(mut self, size: Option<u64>) -> Self {
447 self.known_size = size;
448 self
449 }
450
451 pub fn source(&self) -> &ByteStreamSource {
453 &self.stream
454 }
455
456 pub fn source_mut(&mut self) -> &mut ByteStreamSource {
458 &mut self.stream
459 }
460
461 pub fn span(&self) -> Span {
463 self.span
464 }
465
466 pub fn with_span(mut self, span: Span) -> Self {
468 self.span = span;
469 self
470 }
471
472 pub fn type_(&self) -> ByteStreamType {
474 self.type_
475 }
476
477 pub fn known_size(&self) -> Option<u64> {
479 self.known_size
480 }
481
482 pub fn reader(self) -> Option<Reader> {
489 let reader = self.stream.reader()?;
490 Some(Reader {
491 reader: BufReader::new(reader),
492 span: self.span,
493 signals: self.signals,
494 })
495 }
496
497 pub fn lines(self) -> Option<Lines> {
505 let reader = self.stream.reader()?;
506 Some(Lines {
507 reader: BufReader::new(reader),
508 span: self.span,
509 signals: self.signals,
510 })
511 }
512
513 pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
521 let reader = self.stream.reader()?;
522 Some(SplitRead::new(reader, delimiter, self.span, self.signals))
523 }
524
525 pub fn chunks(self) -> Option<Chunks> {
541 let reader = self.stream.reader()?;
542 Some(Chunks::new(reader, self.span, self.signals, self.type_))
543 }
544
545 pub fn into_source(self) -> ByteStreamSource {
547 self.stream
548 }
549
550 pub fn into_stdio(mut self) -> Result<Stdio, Self> {
558 match self.stream {
559 ByteStreamSource::Read(..) => Err(self),
560 ByteStreamSource::File(file) => Ok(file.into()),
561 #[cfg(feature = "os")]
562 ByteStreamSource::Child(child) => {
563 if let ChildProcess {
564 stdout: Some(ChildPipe::Pipe(stdout)),
565 stderr,
566 ..
567 } = *child
568 {
569 debug_assert!(stderr.is_none(), "stderr should not exist");
570 Ok(stdout.into())
571 } else {
572 self.stream = ByteStreamSource::Child(child);
573 Err(self)
574 }
575 }
576 }
577 }
578
579 #[cfg(feature = "os")]
584 pub fn into_child(self) -> Result<ChildProcess, Self> {
585 if let ByteStreamSource::Child(child) = self.stream {
586 Ok(*child)
587 } else {
588 Err(self)
589 }
590 }
591
592 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
596 let from_io_error = IoError::factory(self.span, None);
598 match self.stream {
599 ByteStreamSource::Read(mut read) => {
600 let mut buf = Vec::new();
601 read.read_to_end(&mut buf).map_err(|err| {
602 match ShellErrorBridge::try_from(err) {
603 Ok(ShellErrorBridge(err)) => err,
604 Err(err) => ShellError::Io(from_io_error(err)),
605 }
606 })?;
607 Ok(buf)
608 }
609 ByteStreamSource::File(mut file) => {
610 let mut buf = Vec::new();
611 file.read_to_end(&mut buf).map_err(&from_io_error)?;
612 Ok(buf)
613 }
614 #[cfg(feature = "os")]
615 ByteStreamSource::Child(child) => child.into_bytes(),
616 }
617 }
618
619 pub fn into_string(self) -> Result<String, ShellError> {
628 let span = self.span;
629 if self.type_.is_string_coercible() {
630 let trim = self.stream.is_external();
631 let bytes = self.into_bytes()?;
632 let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
633 span,
634 msg: err.to_string(),
635 })?;
636 if trim {
637 trim_end_newline(&mut string);
638 }
639 Ok(string)
640 } else {
641 Err(ShellError::TypeMismatch {
642 err_message: "expected string, but got binary".into(),
643 span,
644 })
645 }
646 }
647
648 pub fn into_value(self) -> Result<Value, ShellError> {
661 let span = self.span;
662 let trim = self.stream.is_external();
663 let value = match self.type_ {
664 ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
666 ByteStreamType::String => Value::string(self.into_string()?, span),
667 ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
669 Ok(mut str) => {
670 if trim {
671 trim_end_newline(&mut str);
672 }
673 Value::string(str, span)
674 }
675 Err(err) => Value::binary(err.into_bytes(), span),
676 },
677 };
678 Ok(value)
679 }
680
681 pub fn drain(self) -> Result<(), ShellError> {
683 match self.stream {
684 ByteStreamSource::Read(read) => {
685 copy_with_signals(read, io::sink(), self.span, &self.signals)?;
686 Ok(())
687 }
688 ByteStreamSource::File(_) => Ok(()),
689 #[cfg(feature = "os")]
690 ByteStreamSource::Child(child) => child.wait(),
691 }
692 }
693
694 pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
696 if to_stderr {
697 self.write_to(&mut io::stderr())
698 } else {
699 self.write_to(&mut io::stdout())
700 }
701 }
702
703 pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
705 let span = self.span;
706 let signals = &self.signals;
707 match self.stream {
708 ByteStreamSource::Read(read) => {
709 copy_with_signals(read, dest, span, signals)?;
710 }
711 ByteStreamSource::File(file) => {
712 copy_with_signals(file, dest, span, signals)?;
713 }
714 #[cfg(feature = "os")]
715 ByteStreamSource::Child(mut child) => {
716 debug_assert!(child.stderr.is_none(), "stderr should not exist");
720
721 if let Some(stdout) = child.stdout.take() {
722 match stdout {
723 ChildPipe::Pipe(pipe) => {
724 copy_with_signals(pipe, dest, span, signals)?;
725 }
726 ChildPipe::Tee(tee) => {
727 copy_with_signals(tee, dest, span, signals)?;
728 }
729 }
730 }
731 child.wait()?;
732 }
733 }
734 Ok(())
735 }
736}
737
738impl From<ByteStream> for PipelineData {
739 fn from(stream: ByteStream) -> Self {
740 Self::ByteStream(stream, None)
741 }
742}
743
744struct ReadIterator<I>
745where
746 I: Iterator,
747 I::Item: AsRef<[u8]>,
748{
749 iter: I,
750 cursor: Option<Cursor<I::Item>>,
751}
752
753impl<I> Read for ReadIterator<I>
754where
755 I: Iterator,
756 I::Item: AsRef<[u8]>,
757{
758 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
759 while let Some(cursor) = self.cursor.as_mut() {
760 let read = cursor.read(buf)?;
761 if read == 0 {
762 self.cursor = self.iter.next().map(Cursor::new);
763 } else {
764 return Ok(read);
765 }
766 }
767 Ok(0)
768 }
769}
770
771struct ReadResultIterator<I, T>
772where
773 I: Iterator<Item = Result<T, ShellError>>,
774 T: AsRef<[u8]>,
775{
776 iter: I,
777 cursor: Option<Cursor<T>>,
778}
779
780impl<I, T> Read for ReadResultIterator<I, T>
781where
782 I: Iterator<Item = Result<T, ShellError>>,
783 T: AsRef<[u8]>,
784{
785 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
786 while let Some(cursor) = self.cursor.as_mut() {
787 let read = cursor.read(buf)?;
788 if read == 0 {
789 self.cursor = self
790 .iter
791 .next()
792 .transpose()
793 .map_err(ShellErrorBridge)?
794 .map(Cursor::new);
795 } else {
796 return Ok(read);
797 }
798 }
799 Ok(0)
800 }
801}
802
803pub struct Reader {
804 reader: BufReader<SourceReader>,
805 span: Span,
806 signals: Signals,
807}
808
809impl Reader {
810 pub fn span(&self) -> Span {
811 self.span
812 }
813}
814
815impl Read for Reader {
816 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
817 self.signals.check(&self.span).map_err(ShellErrorBridge)?;
818 self.reader.read(buf)
819 }
820}
821
822impl BufRead for Reader {
823 fn fill_buf(&mut self) -> io::Result<&[u8]> {
824 self.reader.fill_buf()
825 }
826
827 fn consume(&mut self, amt: usize) {
828 self.reader.consume(amt)
829 }
830}
831
832pub struct Lines {
833 reader: BufReader<SourceReader>,
834 span: Span,
835 signals: Signals,
836}
837
838impl Lines {
839 pub fn span(&self) -> Span {
840 self.span
841 }
842}
843
844impl Iterator for Lines {
845 type Item = Result<String, ShellError>;
846
847 fn next(&mut self) -> Option<Self::Item> {
848 if self.signals.interrupted() {
849 None
850 } else {
851 let mut buf = Vec::new();
852 match self.reader.read_until(b'\n', &mut buf) {
853 Ok(0) => None,
854 Ok(_) => {
855 let Ok(mut string) = String::from_utf8(buf) else {
856 return Some(Err(ShellError::NonUtf8 { span: self.span }));
857 };
858 trim_end_newline(&mut string);
859 Some(Ok(string))
860 }
861 Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
862 }
863 }
864 }
865}
866
867mod split_read {
868 use std::io::{BufRead, ErrorKind};
869
870 use memchr::memmem::Finder;
871
872 pub struct SplitRead<R> {
873 reader: Option<R>,
874 buf: Option<Vec<u8>>,
875 finder: Finder<'static>,
876 }
877
878 impl<R: BufRead> SplitRead<R> {
879 pub fn new(reader: R, delim: impl AsRef<[u8]>) -> Self {
880 debug_assert!(!delim.as_ref().is_empty(), "delimiter can't be empty");
882 Self {
883 reader: Some(reader),
884 buf: Some(Vec::new()),
885 finder: Finder::new(delim.as_ref()).into_owned(),
886 }
887 }
888 }
889
890 impl<R: BufRead> Iterator for SplitRead<R> {
891 type Item = Result<Vec<u8>, std::io::Error>;
892
893 fn next(&mut self) -> Option<Self::Item> {
894 let buf = self.buf.as_mut()?;
895 let mut search_start = 0usize;
896
897 loop {
898 if let Some(i) = self.finder.find(&buf[search_start..]) {
899 let needle_idx = search_start + i;
900 let right = buf.split_off(needle_idx + self.finder.needle().len());
901 buf.truncate(needle_idx);
902 let left = std::mem::replace(buf, right);
903 return Some(Ok(left));
904 }
905
906 if let Some(mut r) = self.reader.take() {
907 search_start = buf.len().saturating_sub(self.finder.needle().len() + 1);
908 let available = match r.fill_buf() {
909 Ok(n) => n,
910 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
911 Err(e) => return Some(Err(e)),
912 };
913
914 buf.extend_from_slice(available);
915 let used = available.len();
916 r.consume(used);
917 if used != 0 {
918 self.reader = Some(r);
919 }
920 continue;
921 } else {
922 return self.buf.take().map(Ok);
923 }
924 }
925 }
926 }
927
928 #[cfg(test)]
929 mod tests {
930 use super::*;
931 use std::io::{self, Cursor, Read};
932
933 #[test]
934 fn simple() {
935 let s = "foo-bar-baz";
936 let cursor = Cursor::new(String::from(s));
937 let mut split =
938 SplitRead::new(cursor, "-").map(|r| String::from_utf8(r.unwrap()).unwrap());
939
940 assert_eq!(split.next().as_deref(), Some("foo"));
941 assert_eq!(split.next().as_deref(), Some("bar"));
942 assert_eq!(split.next().as_deref(), Some("baz"));
943 assert_eq!(split.next(), None);
944 }
945
946 #[test]
947 fn with_empty_fields() -> Result<(), io::Error> {
948 let s = "\0\0foo\0\0bar\0\0\0\0baz\0\0";
949 let cursor = Cursor::new(String::from(s));
950 let mut split =
951 SplitRead::new(cursor, "\0\0").map(|r| String::from_utf8(r.unwrap()).unwrap());
952
953 assert_eq!(split.next().as_deref(), Some(""));
954 assert_eq!(split.next().as_deref(), Some("foo"));
955 assert_eq!(split.next().as_deref(), Some("bar"));
956 assert_eq!(split.next().as_deref(), Some(""));
957 assert_eq!(split.next().as_deref(), Some("baz"));
958 assert_eq!(split.next().as_deref(), Some(""));
959 assert_eq!(split.next().as_deref(), None);
960
961 Ok(())
962 }
963
964 #[test]
965 fn complex_delimiter() -> Result<(), io::Error> {
966 let s = "<|>foo<|>bar<|><|>baz<|>";
967 let cursor = Cursor::new(String::from(s));
968 let mut split =
969 SplitRead::new(cursor, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
970
971 assert_eq!(split.next().as_deref(), Some(""));
972 assert_eq!(split.next().as_deref(), Some("foo"));
973 assert_eq!(split.next().as_deref(), Some("bar"));
974 assert_eq!(split.next().as_deref(), Some(""));
975 assert_eq!(split.next().as_deref(), Some("baz"));
976 assert_eq!(split.next().as_deref(), Some(""));
977 assert_eq!(split.next().as_deref(), None);
978
979 Ok(())
980 }
981
982 #[test]
983 fn all_empty() -> Result<(), io::Error> {
984 let s = "<><>";
985 let cursor = Cursor::new(String::from(s));
986 let mut split =
987 SplitRead::new(cursor, "<>").map(|r| String::from_utf8(r.unwrap()).unwrap());
988
989 assert_eq!(split.next().as_deref(), Some(""));
990 assert_eq!(split.next().as_deref(), Some(""));
991 assert_eq!(split.next().as_deref(), Some(""));
992 assert_eq!(split.next(), None);
993
994 Ok(())
995 }
996
997 #[should_panic = "delimiter can't be empty"]
998 #[test]
999 fn empty_delimiter() {
1000 let s = "abc";
1001 let cursor = Cursor::new(String::from(s));
1002 let _split = SplitRead::new(cursor, "").map(|e| e.unwrap());
1003 }
1004
1005 #[test]
1006 fn delimiter_spread_across_reads() {
1007 let reader = Cursor::new("<|>foo<|")
1008 .chain(Cursor::new(">bar<|><"))
1009 .chain(Cursor::new("|>baz<|>"));
1010
1011 let mut split =
1012 SplitRead::new(reader, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
1013
1014 assert_eq!(split.next().unwrap(), "");
1015 assert_eq!(split.next().unwrap(), "foo");
1016 assert_eq!(split.next().unwrap(), "bar");
1017 assert_eq!(split.next().unwrap(), "");
1018 assert_eq!(split.next().unwrap(), "baz");
1019 assert_eq!(split.next().unwrap(), "");
1020 assert_eq!(split.next(), None);
1021 }
1022 }
1023}
1024
1025pub struct SplitRead {
1026 internal: split_read::SplitRead<BufReader<SourceReader>>,
1027 span: Span,
1028 signals: Signals,
1029}
1030
1031impl SplitRead {
1032 fn new(
1033 reader: SourceReader,
1034 delimiter: impl AsRef<[u8]>,
1035 span: Span,
1036 signals: Signals,
1037 ) -> Self {
1038 Self {
1039 internal: split_read::SplitRead::new(BufReader::new(reader), delimiter),
1040 span,
1041 signals,
1042 }
1043 }
1044
1045 pub fn span(&self) -> Span {
1046 self.span
1047 }
1048}
1049
1050impl Iterator for SplitRead {
1051 type Item = Result<Vec<u8>, ShellError>;
1052
1053 fn next(&mut self) -> Option<Self::Item> {
1054 if self.signals.interrupted() {
1055 return None;
1056 }
1057 self.internal.next().map(|r| {
1058 r.map_err(|err| {
1059 ShellError::Io(IoError::new_internal(
1060 err,
1061 "Could not get next value for SplitRead",
1062 crate::location!(),
1063 ))
1064 })
1065 })
1066 }
1067}
1068
1069pub struct Chunks {
1075 reader: BufReader<SourceReader>,
1076 pos: u64,
1077 error: bool,
1078 span: Span,
1079 signals: Signals,
1080 type_: ByteStreamType,
1081}
1082
1083impl Chunks {
1084 fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
1085 Self {
1086 reader: BufReader::new(reader),
1087 pos: 0,
1088 error: false,
1089 span,
1090 signals,
1091 type_,
1092 }
1093 }
1094
1095 pub fn span(&self) -> Span {
1096 self.span
1097 }
1098
1099 fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
1100 let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
1101 Ok(err) => err.0,
1102 Err(err) => IoError::new(err, self.span, None).into(),
1103 };
1104
1105 let buf = self
1107 .reader
1108 .fill_buf()
1109 .map_err(from_io_error)
1110 .map_err(|err| (vec![], err))?;
1111
1112 if buf.is_empty() {
1114 return Ok(None);
1115 }
1116
1117 let mut buf = buf.to_vec();
1118 let mut consumed = 0;
1119
1120 if buf.len() < 4 {
1122 consumed += buf.len();
1123 self.reader.consume(buf.len());
1124 match self.reader.fill_buf() {
1125 Ok(more_bytes) => buf.extend_from_slice(more_bytes),
1126 Err(err) => return Err((buf, from_io_error(err))),
1127 }
1128 }
1129
1130 match String::from_utf8(buf) {
1132 Ok(string) => {
1133 self.reader.consume(string.len() - consumed);
1134 self.pos += string.len() as u64;
1135 Ok(Some(string))
1136 }
1137 Err(err) if err.utf8_error().error_len().is_none() => {
1138 let valid_up_to = err.utf8_error().valid_up_to();
1141 if valid_up_to > consumed {
1142 self.reader.consume(valid_up_to - consumed);
1143 }
1144 let mut buf = err.into_bytes();
1145 buf.truncate(valid_up_to);
1146 buf.shrink_to_fit();
1147 let string = String::from_utf8(buf)
1148 .expect("failed to parse utf-8 even after correcting error");
1149 self.pos += string.len() as u64;
1150 Ok(Some(string))
1151 }
1152 Err(err) => {
1153 let shell_error = ShellError::NonUtf8Custom {
1155 msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
1156 span: self.span,
1157 };
1158 let buf = err.into_bytes();
1159 if buf.len() > consumed {
1162 self.reader.consume(buf.len() - consumed);
1163 }
1164 self.pos += buf.len() as u64;
1165 Err((buf, shell_error))
1166 }
1167 }
1168 }
1169}
1170
1171impl Iterator for Chunks {
1172 type Item = Result<Value, ShellError>;
1173
1174 fn next(&mut self) -> Option<Self::Item> {
1175 if self.error || self.signals.interrupted() {
1176 None
1177 } else {
1178 match self.type_ {
1179 ByteStreamType::Binary => {
1181 let buf = match self.reader.fill_buf() {
1182 Ok(buf) => buf,
1183 Err(err) => {
1184 self.error = true;
1185 return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
1186 }
1187 };
1188 if !buf.is_empty() {
1189 let len = buf.len();
1190 let value = Value::binary(buf, self.span);
1191 self.reader.consume(len);
1192 self.pos += len as u64;
1193 Some(Ok(value))
1194 } else {
1195 None
1196 }
1197 }
1198 ByteStreamType::String => match self.next_string().transpose()? {
1200 Ok(string) => Some(Ok(Value::string(string, self.span))),
1201 Err((_, err)) => {
1202 self.error = true;
1203 Some(Err(err))
1204 }
1205 },
1206 ByteStreamType::Unknown => {
1209 match self.next_string().transpose()? {
1210 Ok(string) => Some(Ok(Value::string(string, self.span))),
1211 Err((buf, _)) if !buf.is_empty() => {
1212 self.type_ = ByteStreamType::Binary;
1214 Some(Ok(Value::binary(buf, self.span)))
1215 }
1216 Err((_, err)) => {
1217 self.error = true;
1218 Some(Err(err))
1219 }
1220 }
1221 }
1222 }
1223 }
1224 }
1225}
1226
1227fn trim_end_newline(string: &mut String) {
1228 if string.ends_with('\n') {
1229 string.pop();
1230 if string.ends_with('\r') {
1231 string.pop();
1232 }
1233 }
1234}
1235
1236#[cfg(unix)]
1237pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1238 file.into().into()
1239}
1240
1241#[cfg(windows)]
1242pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1243 file.into().into()
1244}
1245
1246const DEFAULT_BUF_SIZE: usize = 8192;
1247
1248pub fn copy_with_signals(
1249 mut reader: impl Read,
1250 mut writer: impl Write,
1251 span: Span,
1252 signals: &Signals,
1253) -> Result<u64, ShellError> {
1254 let from_io_error = IoError::factory(span, None);
1255 if signals.is_empty() {
1256 match io::copy(&mut reader, &mut writer) {
1257 Ok(n) => {
1258 writer.flush().map_err(&from_io_error)?;
1259 Ok(n)
1260 }
1261 Err(err) => {
1262 let _ = writer.flush();
1263 match ShellErrorBridge::try_from(err) {
1264 Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1265 Err(err) => Err(from_io_error(err).into()),
1266 }
1267 }
1268 }
1269 } else {
1270 match generic_copy(&mut reader, &mut writer, span, signals) {
1275 Ok(len) => {
1276 writer.flush().map_err(&from_io_error)?;
1277 Ok(len)
1278 }
1279 Err(err) => {
1280 let _ = writer.flush();
1281 Err(err)
1282 }
1283 }
1284 }
1285}
1286
1287fn generic_copy(
1289 mut reader: impl Read,
1290 mut writer: impl Write,
1291 span: Span,
1292 signals: &Signals,
1293) -> Result<u64, ShellError> {
1294 let from_io_error = IoError::factory(span, None);
1295 let buf = &mut [0; DEFAULT_BUF_SIZE];
1296 let mut len = 0;
1297 loop {
1298 signals.check(&span)?;
1299 let n = match reader.read(buf) {
1300 Ok(0) => break,
1301 Ok(n) => n,
1302 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1303 Err(e) => match ShellErrorBridge::try_from(e) {
1304 Ok(ShellErrorBridge(e)) => return Err(e),
1305 Err(e) => return Err(from_io_error(e).into()),
1306 },
1307 };
1308 len += n;
1309 writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1310 }
1311 Ok(len as u64)
1312}
1313
1314struct ReadGenerator<F>
1315where
1316 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1317{
1318 buffer: Cursor<Vec<u8>>,
1319 generator: F,
1320}
1321
1322impl<F> BufRead for ReadGenerator<F>
1323where
1324 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1325{
1326 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1327 while self.buffer.fill_buf()?.is_empty() {
1330 self.buffer.set_position(0);
1332 self.buffer.get_mut().clear();
1333 if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1335 break;
1337 }
1338 }
1339 self.buffer.fill_buf()
1340 }
1341
1342 fn consume(&mut self, amt: usize) {
1343 self.buffer.consume(amt);
1344 }
1345}
1346
1347impl<F> Read for ReadGenerator<F>
1348where
1349 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1350{
1351 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1352 let slice = self.fill_buf()?;
1354 let len = buf.len().min(slice.len());
1355 buf[..len].copy_from_slice(&slice[..len]);
1356 self.consume(len);
1357 Ok(len)
1358 }
1359}
1360
1361#[cfg(test)]
1362mod tests {
1363 use super::*;
1364
1365 fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1366 where
1367 T: AsRef<[u8]> + Default + Send + 'static,
1368 {
1369 let reader = ReadIterator {
1370 iter: data.into_iter(),
1371 cursor: Some(Cursor::new(T::default())),
1372 };
1373 Chunks::new(
1374 SourceReader::Read(Box::new(reader)),
1375 Span::test_data(),
1376 Signals::empty(),
1377 type_,
1378 )
1379 }
1380
1381 #[test]
1382 fn chunks_read_binary_passthrough() {
1383 let bins = vec![&[0, 1][..], &[2, 3][..]];
1384 let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1385
1386 let bins_values: Vec<Value> = bins
1387 .into_iter()
1388 .map(|bin| Value::binary(bin, Span::test_data()))
1389 .collect();
1390 assert_eq!(
1391 bins_values,
1392 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1393 );
1394 }
1395
1396 #[test]
1397 fn chunks_read_string_clean() {
1398 let strs = vec!["Nushell", "が好きです"];
1399 let iter = test_chunks(strs.clone(), ByteStreamType::String);
1400
1401 let strs_values: Vec<Value> = strs
1402 .into_iter()
1403 .map(|string| Value::string(string, Span::test_data()))
1404 .collect();
1405 assert_eq!(
1406 strs_values,
1407 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1408 );
1409 }
1410
1411 #[test]
1412 fn chunks_read_string_split_boundary() {
1413 let real = "Nushell最高!";
1414 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1415 let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1416
1417 let mut string = String::new();
1418 for value in iter {
1419 let chunk_string = value.expect("error").into_string().expect("not a string");
1420 string.push_str(&chunk_string);
1421 }
1422 assert_eq!(real, string);
1423 }
1424
1425 #[test]
1426 fn chunks_read_string_utf8_error() {
1427 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1428 let iter = test_chunks(chunks, ByteStreamType::String);
1429
1430 let mut string = String::new();
1431 for value in iter {
1432 match value {
1433 Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1434 Err(err) => {
1435 println!("string so far: {string:?}");
1436 println!("got error: {err:?}");
1437 assert!(!string.is_empty());
1438 assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1439 return;
1440 }
1441 }
1442 }
1443 panic!("no error");
1444 }
1445
1446 #[test]
1447 fn chunks_read_unknown_fallback() {
1448 let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1449 let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1450
1451 let mut get = || iter.next().expect("end of iter").expect("error");
1452
1453 assert_eq!(Value::test_string("Nushell"), get());
1454 assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1455 assert_eq!(Value::test_binary(b"efgh"), get());
1457 }
1458}