1#[cfg(feature = "os")]
6use crate::process::{ChildPipe, ChildProcess};
7use crate::{
8 shell_error::{bridge::ShellErrorBridge, io::IoError},
9 IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
10};
11use serde::{Deserialize, Serialize};
12use std::ops::Bound;
13#[cfg(unix)]
14use std::os::fd::OwnedFd;
15#[cfg(windows)]
16use std::os::windows::io::OwnedHandle;
17use std::{
18 fmt::Debug,
19 fs::File,
20 io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
21 process::Stdio,
22};
23
24pub enum ByteStreamSource {
31 Read(Box<dyn Read + Send + 'static>),
32 File(File),
33 #[cfg(feature = "os")]
34 Child(Box<ChildProcess>),
35}
36
37impl ByteStreamSource {
38 fn reader(self) -> Option<SourceReader> {
39 match self {
40 ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
41 ByteStreamSource::File(file) => Some(SourceReader::File(file)),
42 #[cfg(feature = "os")]
43 ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
44 ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
45 ChildPipe::Tee(tee) => SourceReader::Read(tee),
46 }),
47 }
48 }
49
50 #[cfg(feature = "os")]
52 pub fn is_external(&self) -> bool {
53 matches!(self, ByteStreamSource::Child(..))
54 }
55
56 #[cfg(not(feature = "os"))]
57 pub fn is_external(&self) -> bool {
58 false
60 }
61}
62
63impl Debug for ByteStreamSource {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
67 ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
68 #[cfg(feature = "os")]
69 ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
70 }
71 }
72}
73
74enum SourceReader {
75 Read(Box<dyn Read + Send + 'static>),
76 File(File),
77}
78
79impl Read for SourceReader {
80 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
81 match self {
82 SourceReader::Read(reader) => reader.read(buf),
83 SourceReader::File(file) => file.read(buf),
84 }
85 }
86}
87
88impl Debug for SourceReader {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
92 SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
93 }
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
99pub enum ByteStreamType {
100 Binary,
103 String,
109 #[default]
112 Unknown,
113}
114
115impl ByteStreamType {
116 pub fn describe(self) -> &'static str {
119 match self {
120 ByteStreamType::Binary => "binary (stream)",
121 ByteStreamType::String => "string (stream)",
122 ByteStreamType::Unknown => "byte stream",
123 }
124 }
125
126 pub fn is_binary_coercible(self) -> bool {
128 matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
129 }
130
131 pub fn is_string_coercible(self) -> bool {
133 matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
134 }
135}
136
137impl From<ByteStreamType> for Type {
138 fn from(value: ByteStreamType) -> Self {
139 match value {
140 ByteStreamType::Binary => Type::Binary,
141 ByteStreamType::String => Type::String,
142 ByteStreamType::Unknown => Type::Any,
143 }
144 }
145}
146
147#[derive(Debug)]
190pub struct ByteStream {
191 stream: ByteStreamSource,
192 span: Span,
193 signals: Signals,
194 type_: ByteStreamType,
195 known_size: Option<u64>,
196 caller_spans: Vec<Span>,
197}
198
199impl ByteStream {
200 pub fn new(
202 stream: ByteStreamSource,
203 span: Span,
204 signals: Signals,
205 type_: ByteStreamType,
206 ) -> Self {
207 Self {
208 stream,
209 span,
210 signals,
211 type_,
212 known_size: None,
213 caller_spans: vec![],
214 }
215 }
216
217 pub fn push_caller_span(&mut self, span: Span) {
219 if span != self.span {
220 self.caller_spans.push(span)
221 }
222 }
223
224 pub fn get_caller_spans(&self) -> &Vec<Span> {
226 &self.caller_spans
227 }
228
229 pub fn read(
231 reader: impl Read + Send + 'static,
232 span: Span,
233 signals: Signals,
234 type_: ByteStreamType,
235 ) -> Self {
236 Self::new(
237 ByteStreamSource::Read(Box::new(reader)),
238 span,
239 signals,
240 type_,
241 )
242 }
243
244 pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
245 let known_size = self.known_size.map(|len| len.saturating_sub(n));
246 if let Some(mut reader) = self.reader() {
247 io::copy(&mut (&mut reader).take(n), &mut io::sink())
249 .map_err(|err| IoError::new(err.kind(), span, None))?;
250 Ok(
251 ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
252 .with_known_size(known_size),
253 )
254 } else {
255 Err(ShellError::TypeMismatch {
256 err_message: "expected readable stream".into(),
257 span,
258 })
259 }
260 }
261
262 pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
263 let known_size = self.known_size.map(|s| s.min(n));
264 if let Some(reader) = self.reader() {
265 Ok(ByteStream::read(
266 reader.take(n),
267 span,
268 Signals::empty(),
269 ByteStreamType::Binary,
270 )
271 .with_known_size(known_size))
272 } else {
273 Err(ShellError::TypeMismatch {
274 err_message: "expected readable stream".into(),
275 span,
276 })
277 }
278 }
279
280 pub fn slice(
281 self,
282 val_span: Span,
283 call_span: Span,
284 range: IntRange,
285 ) -> Result<Self, ShellError> {
286 if let Some(len) = self.known_size {
287 let start = range.absolute_start(len);
288 let stream = self.skip(val_span, start);
289
290 match range.absolute_end(len) {
291 Bound::Unbounded => stream,
292 Bound::Included(end) | Bound::Excluded(end) if end < start => {
293 stream.and_then(|s| s.take(val_span, 0))
294 }
295 Bound::Included(end) => {
296 let distance = end - start + 1;
297 stream.and_then(|s| s.take(val_span, distance.min(len)))
298 }
299 Bound::Excluded(end) => {
300 let distance = end - start;
301 stream.and_then(|s| s.take(val_span, distance.min(len)))
302 }
303 }
304 } else if range.is_relative() {
305 Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
306 } else {
307 let start = range.start() as u64;
308 let stream = self.skip(val_span, start);
309
310 match range.distance() {
311 Bound::Unbounded => stream,
312 Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
313 Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
314 }
315 }
316 }
317
318 pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
320 let len = string.len();
321 ByteStream::read(
322 Cursor::new(string.into_bytes()),
323 span,
324 signals,
325 ByteStreamType::String,
326 )
327 .with_known_size(Some(len as u64))
328 }
329
330 pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
332 let len = bytes.len();
333 ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
334 .with_known_size(Some(len as u64))
335 }
336
337 pub fn file(file: File, span: Span, signals: Signals) -> Self {
342 Self::new(
343 ByteStreamSource::File(file),
344 span,
345 signals,
346 ByteStreamType::Unknown,
347 )
348 }
349
350 #[cfg(feature = "os")]
355 pub fn child(child: ChildProcess, span: Span) -> Self {
356 Self::new(
357 ByteStreamSource::Child(Box::new(child)),
358 span,
359 Signals::empty(),
360 ByteStreamType::Unknown,
361 )
362 }
363
364 #[cfg(feature = "os")]
369 pub fn stdin(span: Span) -> Result<Self, ShellError> {
370 let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err.kind(), span, None))?;
371 let source = ByteStreamSource::File(convert_file(stdin));
372 Ok(Self::new(
373 source,
374 span,
375 Signals::empty(),
376 ByteStreamType::Unknown,
377 ))
378 }
379
380 #[cfg(not(feature = "os"))]
381 pub fn stdin(span: Span) -> Result<Self, ShellError> {
382 Err(ShellError::DisabledOsSupport {
383 msg: "Stdin is not supported".to_string(),
384 span: Some(span),
385 })
386 }
387
388 pub fn from_fn(
391 span: Span,
392 signals: Signals,
393 type_: ByteStreamType,
394 generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
395 ) -> Self {
396 Self::read(
397 ReadGenerator {
398 buffer: Cursor::new(Vec::new()),
399 generator,
400 },
401 span,
402 signals,
403 type_,
404 )
405 }
406
407 pub fn with_type(mut self, type_: ByteStreamType) -> Self {
408 self.type_ = type_;
409 self
410 }
411
412 pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
416 where
417 I: IntoIterator,
418 I::IntoIter: Send + 'static,
419 I::Item: AsRef<[u8]> + Default + Send + 'static,
420 {
421 let iter = iter.into_iter();
422 let cursor = Some(Cursor::new(I::Item::default()));
423 Self::read(ReadIterator { iter, cursor }, span, signals, type_)
424 }
425
426 pub fn from_result_iter<I, T>(
430 iter: I,
431 span: Span,
432 signals: Signals,
433 type_: ByteStreamType,
434 ) -> Self
435 where
436 I: IntoIterator<Item = Result<T, ShellError>>,
437 I::IntoIter: Send + 'static,
438 T: AsRef<[u8]> + Default + Send + 'static,
439 {
440 let iter = iter.into_iter();
441 let cursor = Some(Cursor::new(T::default()));
442 Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
443 }
444
445 pub fn with_known_size(mut self, size: Option<u64>) -> Self {
447 self.known_size = size;
448 self
449 }
450
451 pub fn source(&self) -> &ByteStreamSource {
453 &self.stream
454 }
455
456 pub fn source_mut(&mut self) -> &mut ByteStreamSource {
458 &mut self.stream
459 }
460
461 pub fn span(&self) -> Span {
463 self.span
464 }
465
466 pub fn with_span(mut self, span: Span) -> Self {
468 self.span = span;
469 self
470 }
471
472 pub fn type_(&self) -> ByteStreamType {
474 self.type_
475 }
476
477 pub fn known_size(&self) -> Option<u64> {
479 self.known_size
480 }
481
482 pub fn reader(self) -> Option<Reader> {
489 let reader = self.stream.reader()?;
490 Some(Reader {
491 reader: BufReader::new(reader),
492 span: self.span,
493 signals: self.signals,
494 })
495 }
496
497 pub fn lines(self) -> Option<Lines> {
505 let reader = self.stream.reader()?;
506 Some(Lines {
507 reader: BufReader::new(reader),
508 span: self.span,
509 signals: self.signals,
510 })
511 }
512
513 pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
521 let reader = self.stream.reader()?;
522 Some(SplitRead::new(reader, delimiter, self.span, self.signals))
523 }
524
525 pub fn chunks(self) -> Option<Chunks> {
541 let reader = self.stream.reader()?;
542 Some(Chunks::new(reader, self.span, self.signals, self.type_))
543 }
544
545 pub fn into_source(self) -> ByteStreamSource {
547 self.stream
548 }
549
550 pub fn into_stdio(mut self) -> Result<Stdio, Self> {
558 match self.stream {
559 ByteStreamSource::Read(..) => Err(self),
560 ByteStreamSource::File(file) => Ok(file.into()),
561 #[cfg(feature = "os")]
562 ByteStreamSource::Child(child) => {
563 if let ChildProcess {
564 stdout: Some(ChildPipe::Pipe(stdout)),
565 stderr,
566 ..
567 } = *child
568 {
569 debug_assert!(stderr.is_none(), "stderr should not exist");
570 Ok(stdout.into())
571 } else {
572 self.stream = ByteStreamSource::Child(child);
573 Err(self)
574 }
575 }
576 }
577 }
578
579 #[cfg(feature = "os")]
584 pub fn into_child(self) -> Result<ChildProcess, Self> {
585 if let ByteStreamSource::Child(child) = self.stream {
586 Ok(*child)
587 } else {
588 Err(self)
589 }
590 }
591
592 pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
596 let from_io_error = IoError::factory(self.span, None);
598 match self.stream {
599 ByteStreamSource::Read(mut read) => {
600 let mut buf = Vec::new();
601 read.read_to_end(&mut buf).map_err(&from_io_error)?;
602 Ok(buf)
603 }
604 ByteStreamSource::File(mut file) => {
605 let mut buf = Vec::new();
606 file.read_to_end(&mut buf).map_err(&from_io_error)?;
607 Ok(buf)
608 }
609 #[cfg(feature = "os")]
610 ByteStreamSource::Child(child) => child.into_bytes(),
611 }
612 }
613
614 pub fn into_string(self) -> Result<String, ShellError> {
623 let span = self.span;
624 if self.type_.is_string_coercible() {
625 let trim = self.stream.is_external();
626 let bytes = self.into_bytes()?;
627 let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
628 span,
629 msg: err.to_string(),
630 })?;
631 if trim {
632 trim_end_newline(&mut string);
633 }
634 Ok(string)
635 } else {
636 Err(ShellError::TypeMismatch {
637 err_message: "expected string, but got binary".into(),
638 span,
639 })
640 }
641 }
642
643 pub fn into_value(self) -> Result<Value, ShellError> {
656 let span = self.span;
657 let trim = self.stream.is_external();
658 let value = match self.type_ {
659 ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
661 ByteStreamType::String => Value::string(self.into_string()?, span),
662 ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
664 Ok(mut str) => {
665 if trim {
666 trim_end_newline(&mut str);
667 }
668 Value::string(str, span)
669 }
670 Err(err) => Value::binary(err.into_bytes(), span),
671 },
672 };
673 Ok(value)
674 }
675
676 pub fn drain(self) -> Result<(), ShellError> {
678 match self.stream {
679 ByteStreamSource::Read(read) => {
680 copy_with_signals(read, io::sink(), self.span, &self.signals)?;
681 Ok(())
682 }
683 ByteStreamSource::File(_) => Ok(()),
684 #[cfg(feature = "os")]
685 ByteStreamSource::Child(child) => child.wait(),
686 }
687 }
688
689 pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
691 if to_stderr {
692 self.write_to(&mut io::stderr())
693 } else {
694 self.write_to(&mut io::stdout())
695 }
696 }
697
698 pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
700 let span = self.span;
701 let signals = &self.signals;
702 match self.stream {
703 ByteStreamSource::Read(read) => {
704 copy_with_signals(read, dest, span, signals)?;
705 }
706 ByteStreamSource::File(file) => {
707 copy_with_signals(file, dest, span, signals)?;
708 }
709 #[cfg(feature = "os")]
710 ByteStreamSource::Child(mut child) => {
711 debug_assert!(child.stderr.is_none(), "stderr should not exist");
715
716 if let Some(stdout) = child.stdout.take() {
717 match stdout {
718 ChildPipe::Pipe(pipe) => {
719 copy_with_signals(pipe, dest, span, signals)?;
720 }
721 ChildPipe::Tee(tee) => {
722 copy_with_signals(tee, dest, span, signals)?;
723 }
724 }
725 }
726 child.wait()?;
727 }
728 }
729 Ok(())
730 }
731}
732
733impl From<ByteStream> for PipelineData {
734 fn from(stream: ByteStream) -> Self {
735 Self::ByteStream(stream, None)
736 }
737}
738
739struct ReadIterator<I>
740where
741 I: Iterator,
742 I::Item: AsRef<[u8]>,
743{
744 iter: I,
745 cursor: Option<Cursor<I::Item>>,
746}
747
748impl<I> Read for ReadIterator<I>
749where
750 I: Iterator,
751 I::Item: AsRef<[u8]>,
752{
753 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
754 while let Some(cursor) = self.cursor.as_mut() {
755 let read = cursor.read(buf)?;
756 if read == 0 {
757 self.cursor = self.iter.next().map(Cursor::new);
758 } else {
759 return Ok(read);
760 }
761 }
762 Ok(0)
763 }
764}
765
766struct ReadResultIterator<I, T>
767where
768 I: Iterator<Item = Result<T, ShellError>>,
769 T: AsRef<[u8]>,
770{
771 iter: I,
772 cursor: Option<Cursor<T>>,
773}
774
775impl<I, T> Read for ReadResultIterator<I, T>
776where
777 I: Iterator<Item = Result<T, ShellError>>,
778 T: AsRef<[u8]>,
779{
780 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
781 while let Some(cursor) = self.cursor.as_mut() {
782 let read = cursor.read(buf)?;
783 if read == 0 {
784 self.cursor = self
785 .iter
786 .next()
787 .transpose()
788 .map_err(ShellErrorBridge)?
789 .map(Cursor::new);
790 } else {
791 return Ok(read);
792 }
793 }
794 Ok(0)
795 }
796}
797
798pub struct Reader {
799 reader: BufReader<SourceReader>,
800 span: Span,
801 signals: Signals,
802}
803
804impl Reader {
805 pub fn span(&self) -> Span {
806 self.span
807 }
808}
809
810impl Read for Reader {
811 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
812 self.signals.check(self.span).map_err(ShellErrorBridge)?;
813 self.reader.read(buf)
814 }
815}
816
817impl BufRead for Reader {
818 fn fill_buf(&mut self) -> io::Result<&[u8]> {
819 self.reader.fill_buf()
820 }
821
822 fn consume(&mut self, amt: usize) {
823 self.reader.consume(amt)
824 }
825}
826
827pub struct Lines {
828 reader: BufReader<SourceReader>,
829 span: Span,
830 signals: Signals,
831}
832
833impl Lines {
834 pub fn span(&self) -> Span {
835 self.span
836 }
837}
838
839impl Iterator for Lines {
840 type Item = Result<String, ShellError>;
841
842 fn next(&mut self) -> Option<Self::Item> {
843 if self.signals.interrupted() {
844 None
845 } else {
846 let mut buf = Vec::new();
847 match self.reader.read_until(b'\n', &mut buf) {
848 Ok(0) => None,
849 Ok(_) => {
850 let Ok(mut string) = String::from_utf8(buf) else {
851 return Some(Err(ShellError::NonUtf8 { span: self.span }));
852 };
853 trim_end_newline(&mut string);
854 Some(Ok(string))
855 }
856 Err(e) => Some(Err(IoError::new(e.kind(), self.span, None).into())),
857 }
858 }
859 }
860}
861
862mod split_read {
863 use std::io::{BufRead, ErrorKind};
864
865 use memchr::memmem::Finder;
866
867 pub struct SplitRead<R> {
868 reader: Option<R>,
869 buf: Option<Vec<u8>>,
870 finder: Finder<'static>,
871 }
872
873 impl<R: BufRead> SplitRead<R> {
874 pub fn new(reader: R, delim: impl AsRef<[u8]>) -> Self {
875 debug_assert!(!delim.as_ref().is_empty(), "delimiter can't be empty");
877 Self {
878 reader: Some(reader),
879 buf: Some(Vec::new()),
880 finder: Finder::new(delim.as_ref()).into_owned(),
881 }
882 }
883 }
884
885 impl<R: BufRead> Iterator for SplitRead<R> {
886 type Item = Result<Vec<u8>, std::io::Error>;
887
888 fn next(&mut self) -> Option<Self::Item> {
889 let buf = self.buf.as_mut()?;
890 let mut search_start = 0usize;
891
892 loop {
893 if let Some(i) = self.finder.find(&buf[search_start..]) {
894 let needle_idx = search_start + i;
895 let right = buf.split_off(needle_idx + self.finder.needle().len());
896 buf.truncate(needle_idx);
897 let left = std::mem::replace(buf, right);
898 return Some(Ok(left));
899 }
900
901 if let Some(mut r) = self.reader.take() {
902 search_start = buf.len().saturating_sub(self.finder.needle().len() + 1);
903 let available = match r.fill_buf() {
904 Ok(n) => n,
905 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
906 Err(e) => return Some(Err(e)),
907 };
908
909 buf.extend_from_slice(available);
910 let used = available.len();
911 r.consume(used);
912 if used != 0 {
913 self.reader = Some(r);
914 }
915 continue;
916 } else {
917 return self.buf.take().map(Ok);
918 }
919 }
920 }
921 }
922
923 #[cfg(test)]
924 mod tests {
925 use super::*;
926 use std::io::{self, Cursor, Read};
927
928 #[test]
929 fn simple() {
930 let s = "foo-bar-baz";
931 let cursor = Cursor::new(String::from(s));
932 let mut split =
933 SplitRead::new(cursor, "-").map(|r| String::from_utf8(r.unwrap()).unwrap());
934
935 assert_eq!(split.next().as_deref(), Some("foo"));
936 assert_eq!(split.next().as_deref(), Some("bar"));
937 assert_eq!(split.next().as_deref(), Some("baz"));
938 assert_eq!(split.next(), None);
939 }
940
941 #[test]
942 fn with_empty_fields() -> Result<(), io::Error> {
943 let s = "\0\0foo\0\0bar\0\0\0\0baz\0\0";
944 let cursor = Cursor::new(String::from(s));
945 let mut split =
946 SplitRead::new(cursor, "\0\0").map(|r| String::from_utf8(r.unwrap()).unwrap());
947
948 assert_eq!(split.next().as_deref(), Some(""));
949 assert_eq!(split.next().as_deref(), Some("foo"));
950 assert_eq!(split.next().as_deref(), Some("bar"));
951 assert_eq!(split.next().as_deref(), Some(""));
952 assert_eq!(split.next().as_deref(), Some("baz"));
953 assert_eq!(split.next().as_deref(), Some(""));
954 assert_eq!(split.next().as_deref(), None);
955
956 Ok(())
957 }
958
959 #[test]
960 fn complex_delimiter() -> Result<(), io::Error> {
961 let s = "<|>foo<|>bar<|><|>baz<|>";
962 let cursor = Cursor::new(String::from(s));
963 let mut split =
964 SplitRead::new(cursor, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
965
966 assert_eq!(split.next().as_deref(), Some(""));
967 assert_eq!(split.next().as_deref(), Some("foo"));
968 assert_eq!(split.next().as_deref(), Some("bar"));
969 assert_eq!(split.next().as_deref(), Some(""));
970 assert_eq!(split.next().as_deref(), Some("baz"));
971 assert_eq!(split.next().as_deref(), Some(""));
972 assert_eq!(split.next().as_deref(), None);
973
974 Ok(())
975 }
976
977 #[test]
978 fn all_empty() -> Result<(), io::Error> {
979 let s = "<><>";
980 let cursor = Cursor::new(String::from(s));
981 let mut split =
982 SplitRead::new(cursor, "<>").map(|r| String::from_utf8(r.unwrap()).unwrap());
983
984 assert_eq!(split.next().as_deref(), Some(""));
985 assert_eq!(split.next().as_deref(), Some(""));
986 assert_eq!(split.next().as_deref(), Some(""));
987 assert_eq!(split.next(), None);
988
989 Ok(())
990 }
991
992 #[should_panic = "delimiter can't be empty"]
993 #[test]
994 fn empty_delimiter() {
995 let s = "abc";
996 let cursor = Cursor::new(String::from(s));
997 let _split = SplitRead::new(cursor, "").map(|e| e.unwrap());
998 }
999
1000 #[test]
1001 fn delimiter_spread_across_reads() {
1002 let reader = Cursor::new("<|>foo<|")
1003 .chain(Cursor::new(">bar<|><"))
1004 .chain(Cursor::new("|>baz<|>"));
1005
1006 let mut split =
1007 SplitRead::new(reader, "<|>").map(|r| String::from_utf8(r.unwrap()).unwrap());
1008
1009 assert_eq!(split.next().unwrap(), "");
1010 assert_eq!(split.next().unwrap(), "foo");
1011 assert_eq!(split.next().unwrap(), "bar");
1012 assert_eq!(split.next().unwrap(), "");
1013 assert_eq!(split.next().unwrap(), "baz");
1014 assert_eq!(split.next().unwrap(), "");
1015 assert_eq!(split.next(), None);
1016 }
1017 }
1018}
1019
1020pub struct SplitRead {
1021 internal: split_read::SplitRead<BufReader<SourceReader>>,
1022 span: Span,
1023 signals: Signals,
1024}
1025
1026impl SplitRead {
1027 fn new(
1028 reader: SourceReader,
1029 delimiter: impl AsRef<[u8]>,
1030 span: Span,
1031 signals: Signals,
1032 ) -> Self {
1033 Self {
1034 internal: split_read::SplitRead::new(BufReader::new(reader), delimiter),
1035 span,
1036 signals,
1037 }
1038 }
1039
1040 pub fn span(&self) -> Span {
1041 self.span
1042 }
1043}
1044
1045impl Iterator for SplitRead {
1046 type Item = Result<Vec<u8>, ShellError>;
1047
1048 fn next(&mut self) -> Option<Self::Item> {
1049 if self.signals.interrupted() {
1050 return None;
1051 }
1052 self.internal.next().map(|r| {
1053 r.map_err(|err| {
1054 ShellError::Io(IoError::new_internal(
1055 err.kind(),
1056 "Could not get next value for SplitRead",
1057 crate::location!(),
1058 ))
1059 })
1060 })
1061 }
1062}
1063
1064pub struct Chunks {
1070 reader: BufReader<SourceReader>,
1071 pos: u64,
1072 error: bool,
1073 span: Span,
1074 signals: Signals,
1075 type_: ByteStreamType,
1076}
1077
1078impl Chunks {
1079 fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
1080 Self {
1081 reader: BufReader::new(reader),
1082 pos: 0,
1083 error: false,
1084 span,
1085 signals,
1086 type_,
1087 }
1088 }
1089
1090 pub fn span(&self) -> Span {
1091 self.span
1092 }
1093
1094 fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
1095 let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
1096 Ok(err) => err.0,
1097 Err(err) => IoError::new(err.kind(), self.span, None).into(),
1098 };
1099
1100 let buf = self
1102 .reader
1103 .fill_buf()
1104 .map_err(from_io_error)
1105 .map_err(|err| (vec![], err))?;
1106
1107 if buf.is_empty() {
1109 return Ok(None);
1110 }
1111
1112 let mut buf = buf.to_vec();
1113 let mut consumed = 0;
1114
1115 if buf.len() < 4 {
1117 consumed += buf.len();
1118 self.reader.consume(buf.len());
1119 match self.reader.fill_buf() {
1120 Ok(more_bytes) => buf.extend_from_slice(more_bytes),
1121 Err(err) => return Err((buf, from_io_error(err))),
1122 }
1123 }
1124
1125 match String::from_utf8(buf) {
1127 Ok(string) => {
1128 self.reader.consume(string.len() - consumed);
1129 self.pos += string.len() as u64;
1130 Ok(Some(string))
1131 }
1132 Err(err) if err.utf8_error().error_len().is_none() => {
1133 let valid_up_to = err.utf8_error().valid_up_to();
1136 if valid_up_to > consumed {
1137 self.reader.consume(valid_up_to - consumed);
1138 }
1139 let mut buf = err.into_bytes();
1140 buf.truncate(valid_up_to);
1141 buf.shrink_to_fit();
1142 let string = String::from_utf8(buf)
1143 .expect("failed to parse utf-8 even after correcting error");
1144 self.pos += string.len() as u64;
1145 Ok(Some(string))
1146 }
1147 Err(err) => {
1148 let shell_error = ShellError::NonUtf8Custom {
1150 msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
1151 span: self.span,
1152 };
1153 let buf = err.into_bytes();
1154 if buf.len() > consumed {
1157 self.reader.consume(buf.len() - consumed);
1158 }
1159 self.pos += buf.len() as u64;
1160 Err((buf, shell_error))
1161 }
1162 }
1163 }
1164}
1165
1166impl Iterator for Chunks {
1167 type Item = Result<Value, ShellError>;
1168
1169 fn next(&mut self) -> Option<Self::Item> {
1170 if self.error || self.signals.interrupted() {
1171 None
1172 } else {
1173 match self.type_ {
1174 ByteStreamType::Binary => {
1176 let buf = match self.reader.fill_buf() {
1177 Ok(buf) => buf,
1178 Err(err) => {
1179 self.error = true;
1180 return Some(Err(ShellError::Io(IoError::new(
1181 err.kind(),
1182 self.span,
1183 None,
1184 ))));
1185 }
1186 };
1187 if !buf.is_empty() {
1188 let len = buf.len();
1189 let value = Value::binary(buf, self.span);
1190 self.reader.consume(len);
1191 self.pos += len as u64;
1192 Some(Ok(value))
1193 } else {
1194 None
1195 }
1196 }
1197 ByteStreamType::String => match self.next_string().transpose()? {
1199 Ok(string) => Some(Ok(Value::string(string, self.span))),
1200 Err((_, err)) => {
1201 self.error = true;
1202 Some(Err(err))
1203 }
1204 },
1205 ByteStreamType::Unknown => {
1208 match self.next_string().transpose()? {
1209 Ok(string) => Some(Ok(Value::string(string, self.span))),
1210 Err((buf, _)) if !buf.is_empty() => {
1211 self.type_ = ByteStreamType::Binary;
1213 Some(Ok(Value::binary(buf, self.span)))
1214 }
1215 Err((_, err)) => {
1216 self.error = true;
1217 Some(Err(err))
1218 }
1219 }
1220 }
1221 }
1222 }
1223 }
1224}
1225
1226fn trim_end_newline(string: &mut String) {
1227 if string.ends_with('\n') {
1228 string.pop();
1229 if string.ends_with('\r') {
1230 string.pop();
1231 }
1232 }
1233}
1234
1235#[cfg(unix)]
1236pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
1237 file.into().into()
1238}
1239
1240#[cfg(windows)]
1241pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
1242 file.into().into()
1243}
1244
1245const DEFAULT_BUF_SIZE: usize = 8192;
1246
1247pub fn copy_with_signals(
1248 mut reader: impl Read,
1249 mut writer: impl Write,
1250 span: Span,
1251 signals: &Signals,
1252) -> Result<u64, ShellError> {
1253 let from_io_error = IoError::factory(span, None);
1254 if signals.is_empty() {
1255 match io::copy(&mut reader, &mut writer) {
1256 Ok(n) => {
1257 writer.flush().map_err(&from_io_error)?;
1258 Ok(n)
1259 }
1260 Err(err) => {
1261 let _ = writer.flush();
1262 match ShellErrorBridge::try_from(err) {
1263 Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
1264 Err(err) => Err(from_io_error(err).into()),
1265 }
1266 }
1267 }
1268 } else {
1269 match generic_copy(&mut reader, &mut writer, span, signals) {
1274 Ok(len) => {
1275 writer.flush().map_err(&from_io_error)?;
1276 Ok(len)
1277 }
1278 Err(err) => {
1279 let _ = writer.flush();
1280 Err(err)
1281 }
1282 }
1283 }
1284}
1285
1286fn generic_copy(
1288 mut reader: impl Read,
1289 mut writer: impl Write,
1290 span: Span,
1291 signals: &Signals,
1292) -> Result<u64, ShellError> {
1293 let from_io_error = IoError::factory(span, None);
1294 let buf = &mut [0; DEFAULT_BUF_SIZE];
1295 let mut len = 0;
1296 loop {
1297 signals.check(span)?;
1298 let n = match reader.read(buf) {
1299 Ok(0) => break,
1300 Ok(n) => n,
1301 Err(e) if e.kind() == ErrorKind::Interrupted => continue,
1302 Err(e) => match ShellErrorBridge::try_from(e) {
1303 Ok(ShellErrorBridge(e)) => return Err(e),
1304 Err(e) => return Err(from_io_error(e).into()),
1305 },
1306 };
1307 len += n;
1308 writer.write_all(&buf[..n]).map_err(&from_io_error)?;
1309 }
1310 Ok(len as u64)
1311}
1312
1313struct ReadGenerator<F>
1314where
1315 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1316{
1317 buffer: Cursor<Vec<u8>>,
1318 generator: F,
1319}
1320
1321impl<F> BufRead for ReadGenerator<F>
1322where
1323 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1324{
1325 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
1326 while self.buffer.fill_buf()?.is_empty() {
1329 self.buffer.set_position(0);
1331 self.buffer.get_mut().clear();
1332 if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
1334 break;
1336 }
1337 }
1338 self.buffer.fill_buf()
1339 }
1340
1341 fn consume(&mut self, amt: usize) {
1342 self.buffer.consume(amt);
1343 }
1344}
1345
1346impl<F> Read for ReadGenerator<F>
1347where
1348 F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
1349{
1350 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1351 let slice = self.fill_buf()?;
1353 let len = buf.len().min(slice.len());
1354 buf[..len].copy_from_slice(&slice[..len]);
1355 self.consume(len);
1356 Ok(len)
1357 }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use super::*;
1363
1364 fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
1365 where
1366 T: AsRef<[u8]> + Default + Send + 'static,
1367 {
1368 let reader = ReadIterator {
1369 iter: data.into_iter(),
1370 cursor: Some(Cursor::new(T::default())),
1371 };
1372 Chunks::new(
1373 SourceReader::Read(Box::new(reader)),
1374 Span::test_data(),
1375 Signals::empty(),
1376 type_,
1377 )
1378 }
1379
1380 #[test]
1381 fn chunks_read_binary_passthrough() {
1382 let bins = vec![&[0, 1][..], &[2, 3][..]];
1383 let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
1384
1385 let bins_values: Vec<Value> = bins
1386 .into_iter()
1387 .map(|bin| Value::binary(bin, Span::test_data()))
1388 .collect();
1389 assert_eq!(
1390 bins_values,
1391 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1392 );
1393 }
1394
1395 #[test]
1396 fn chunks_read_string_clean() {
1397 let strs = vec!["Nushell", "が好きです"];
1398 let iter = test_chunks(strs.clone(), ByteStreamType::String);
1399
1400 let strs_values: Vec<Value> = strs
1401 .into_iter()
1402 .map(|string| Value::string(string, Span::test_data()))
1403 .collect();
1404 assert_eq!(
1405 strs_values,
1406 iter.collect::<Result<Vec<Value>, _>>().expect("error")
1407 );
1408 }
1409
1410 #[test]
1411 fn chunks_read_string_split_boundary() {
1412 let real = "Nushell最高!";
1413 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
1414 let iter = test_chunks(chunks.clone(), ByteStreamType::String);
1415
1416 let mut string = String::new();
1417 for value in iter {
1418 let chunk_string = value.expect("error").into_string().expect("not a string");
1419 string.push_str(&chunk_string);
1420 }
1421 assert_eq!(real, string);
1422 }
1423
1424 #[test]
1425 fn chunks_read_string_utf8_error() {
1426 let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
1427 let iter = test_chunks(chunks, ByteStreamType::String);
1428
1429 let mut string = String::new();
1430 for value in iter {
1431 match value {
1432 Ok(value) => string.push_str(&value.into_string().expect("not a string")),
1433 Err(err) => {
1434 println!("string so far: {:?}", string);
1435 println!("got error: {err:?}");
1436 assert!(!string.is_empty());
1437 assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
1438 return;
1439 }
1440 }
1441 }
1442 panic!("no error");
1443 }
1444
1445 #[test]
1446 fn chunks_read_unknown_fallback() {
1447 let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
1448 let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
1449
1450 let mut get = || iter.next().expect("end of iter").expect("error");
1451
1452 assert_eq!(Value::test_string("Nushell"), get());
1453 assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
1454 assert_eq!(Value::test_binary(b"efgh"), get());
1456 }
1457}