#[cfg(feature = "os")]
use crate::process::{ChildPipe, ChildProcess};
use crate::{
IntRange, PipelineData, ShellError, Signals, Span, Type, Value,
shell_error::{bridge::ShellErrorBridge, io::IoError},
};
use nu_utils::SplitRead as SplitReadInner;
use serde::{Deserialize, Serialize};
use std::ops::Bound;
#[cfg(unix)]
use std::os::fd::OwnedFd;
#[cfg(windows)]
use std::os::windows::io::OwnedHandle;
use std::{
fmt::Debug,
fs::File,
io::{self, BufRead, BufReader, Cursor, ErrorKind, Read, Write},
process::Stdio,
};
pub enum ByteStreamSource {
Read(Box<dyn Read + Send + 'static>),
File(File),
#[cfg(feature = "os")]
Child(Box<ChildProcess>),
}
impl ByteStreamSource {
fn reader(self) -> Option<SourceReader> {
match self {
ByteStreamSource::Read(read) => Some(SourceReader::Read(read)),
ByteStreamSource::File(file) => Some(SourceReader::File(file)),
#[cfg(feature = "os")]
ByteStreamSource::Child(mut child) => child.stdout.take().map(|stdout| match stdout {
ChildPipe::Pipe(pipe) => SourceReader::File(convert_file(pipe)),
ChildPipe::Tee(tee) => SourceReader::Read(tee),
}),
}
}
#[cfg(feature = "os")]
pub fn is_external(&self) -> bool {
matches!(self, ByteStreamSource::Child(..))
}
#[cfg(not(feature = "os"))]
pub fn is_external(&self) -> bool {
false
}
}
impl Debug for ByteStreamSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ByteStreamSource::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
ByteStreamSource::File(file) => f.debug_tuple("File").field(file).finish(),
#[cfg(feature = "os")]
ByteStreamSource::Child(child) => f.debug_tuple("Child").field(child).finish(),
}
}
}
enum SourceReader {
Read(Box<dyn Read + Send + 'static>),
File(File),
}
impl Read for SourceReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
SourceReader::Read(reader) => reader.read(buf),
SourceReader::File(file) => file.read(buf),
}
}
}
impl Debug for SourceReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SourceReader::Read(_) => f.debug_tuple("Read").field(&"..").finish(),
SourceReader::File(file) => f.debug_tuple("File").field(file).finish(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum ByteStreamType {
Binary,
String,
#[default]
Unknown,
}
impl ByteStreamType {
pub fn describe(self) -> &'static str {
match self {
ByteStreamType::Binary => "binary (stream)",
ByteStreamType::String => "string (stream)",
ByteStreamType::Unknown => "byte stream",
}
}
pub fn is_binary_coercible(self) -> bool {
matches!(self, ByteStreamType::Binary | ByteStreamType::Unknown)
}
pub fn is_string_coercible(self) -> bool {
matches!(self, ByteStreamType::String | ByteStreamType::Unknown)
}
}
impl From<ByteStreamType> for Type {
fn from(value: ByteStreamType) -> Self {
match value {
ByteStreamType::Binary => Type::Binary,
ByteStreamType::String => Type::String,
ByteStreamType::Unknown => Type::Any,
}
}
}
#[derive(Debug)]
pub struct ByteStream {
stream: ByteStreamSource,
span: Span,
signals: Signals,
type_: ByteStreamType,
known_size: Option<u64>,
caller_spans: Vec<Span>,
}
impl ByteStream {
pub fn new(
stream: ByteStreamSource,
span: Span,
signals: Signals,
type_: ByteStreamType,
) -> Self {
Self {
stream,
span,
signals,
type_,
known_size: None,
caller_spans: vec![],
}
}
pub fn push_caller_span(&mut self, span: Span) {
if span != self.span {
self.caller_spans.push(span)
}
}
pub fn get_caller_spans(&self) -> &Vec<Span> {
&self.caller_spans
}
pub fn read(
reader: impl Read + Send + 'static,
span: Span,
signals: Signals,
type_: ByteStreamType,
) -> Self {
Self::new(
ByteStreamSource::Read(Box::new(reader)),
span,
signals,
type_,
)
}
pub fn skip(self, span: Span, n: u64) -> Result<Self, ShellError> {
let known_size = self.known_size.map(|len| len.saturating_sub(n));
if let Some(mut reader) = self.reader() {
io::copy(&mut (&mut reader).take(n), &mut io::sink())
.map_err(|err| IoError::new(err, span, None))?;
Ok(
ByteStream::read(reader, span, Signals::empty(), ByteStreamType::Binary)
.with_known_size(known_size),
)
} else {
Err(ShellError::TypeMismatch {
err_message: "expected readable stream".into(),
span,
})
}
}
pub fn take(self, span: Span, n: u64) -> Result<Self, ShellError> {
let known_size = self.known_size.map(|s| s.min(n));
if let Some(reader) = self.reader() {
Ok(ByteStream::read(
reader.take(n),
span,
Signals::empty(),
ByteStreamType::Binary,
)
.with_known_size(known_size))
} else {
Err(ShellError::TypeMismatch {
err_message: "expected readable stream".into(),
span,
})
}
}
pub fn slice(
self,
val_span: Span,
call_span: Span,
range: IntRange,
) -> Result<Self, ShellError> {
if let Some(len) = self.known_size {
let start = range.absolute_start(len);
let stream = self.skip(val_span, start);
match range.absolute_end(len) {
Bound::Unbounded => stream,
Bound::Included(end) | Bound::Excluded(end) if end < start => {
stream.and_then(|s| s.take(val_span, 0))
}
Bound::Included(end) => {
let distance = end - start + 1;
stream.and_then(|s| s.take(val_span, distance.min(len)))
}
Bound::Excluded(end) => {
let distance = end - start;
stream.and_then(|s| s.take(val_span, distance.min(len)))
}
}
} else if range.is_relative() {
Err(ShellError::RelativeRangeOnInfiniteStream { span: call_span })
} else {
let start = range.start() as u64;
let stream = self.skip(val_span, start);
match range.distance() {
Bound::Unbounded => stream,
Bound::Included(distance) => stream.and_then(|s| s.take(val_span, distance + 1)),
Bound::Excluded(distance) => stream.and_then(|s| s.take(val_span, distance)),
}
}
}
pub fn read_string(string: String, span: Span, signals: Signals) -> Self {
let len = string.len();
ByteStream::read(
Cursor::new(string.into_bytes()),
span,
signals,
ByteStreamType::String,
)
.with_known_size(Some(len as u64))
}
pub fn read_binary(bytes: Vec<u8>, span: Span, signals: Signals) -> Self {
let len = bytes.len();
ByteStream::read(Cursor::new(bytes), span, signals, ByteStreamType::Binary)
.with_known_size(Some(len as u64))
}
pub fn file(file: File, span: Span, signals: Signals) -> Self {
Self::new(
ByteStreamSource::File(file),
span,
signals,
ByteStreamType::Unknown,
)
}
#[cfg(feature = "os")]
pub fn child(child: ChildProcess, span: Span) -> Self {
Self::new(
ByteStreamSource::Child(Box::new(child)),
span,
Signals::empty(),
ByteStreamType::Unknown,
)
}
#[cfg(feature = "os")]
pub fn stdin(span: Span) -> Result<Self, ShellError> {
let stdin = os_pipe::dup_stdin().map_err(|err| IoError::new(err, span, None))?;
let source = ByteStreamSource::File(convert_file(stdin));
Ok(Self::new(
source,
span,
Signals::empty(),
ByteStreamType::Unknown,
))
}
#[cfg(not(feature = "os"))]
pub fn stdin(span: Span) -> Result<Self, ShellError> {
Err(ShellError::DisabledOsSupport {
msg: "Stdin is not supported".to_string(),
span,
})
}
pub fn from_fn(
span: Span,
signals: Signals,
type_: ByteStreamType,
generator: impl FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
) -> Self {
Self::read(
ReadGenerator {
buffer: Cursor::new(Vec::new()),
generator,
},
span,
signals,
type_,
)
}
pub fn with_type(mut self, type_: ByteStreamType) -> Self {
self.type_ = type_;
self
}
pub fn from_iter<I>(iter: I, span: Span, signals: Signals, type_: ByteStreamType) -> Self
where
I: IntoIterator,
I::IntoIter: Send + 'static,
I::Item: AsRef<[u8]> + Default + Send + 'static,
{
let iter = iter.into_iter();
let cursor = Some(Cursor::new(I::Item::default()));
Self::read(ReadIterator { iter, cursor }, span, signals, type_)
}
pub fn from_result_iter<I, T>(
iter: I,
span: Span,
signals: Signals,
type_: ByteStreamType,
) -> Self
where
I: IntoIterator<Item = Result<T, ShellError>>,
I::IntoIter: Send + 'static,
T: AsRef<[u8]> + Default + Send + 'static,
{
let iter = iter.into_iter();
let cursor = Some(Cursor::new(T::default()));
Self::read(ReadResultIterator { iter, cursor }, span, signals, type_)
}
pub fn with_known_size(mut self, size: Option<u64>) -> Self {
self.known_size = size;
self
}
pub fn source(&self) -> &ByteStreamSource {
&self.stream
}
pub fn source_mut(&mut self) -> &mut ByteStreamSource {
&mut self.stream
}
pub fn span(&self) -> Span {
self.span
}
pub fn with_span(mut self, span: Span) -> Self {
self.span = span;
self
}
pub fn type_(&self) -> ByteStreamType {
self.type_
}
pub fn known_size(&self) -> Option<u64> {
self.known_size
}
pub fn reader(self) -> Option<Reader> {
let reader = self.stream.reader()?;
Some(Reader {
reader: BufReader::new(reader),
span: self.span,
signals: self.signals,
})
}
pub fn lines(self) -> Option<Lines> {
let reader = self.stream.reader()?;
Some(Lines {
reader: BufReader::new(reader),
span: self.span,
signals: self.signals,
})
}
pub fn split(self, delimiter: Vec<u8>) -> Option<SplitRead> {
let reader = self.stream.reader()?;
Some(SplitRead::new(reader, delimiter, self.span, self.signals))
}
pub fn chunks(self) -> Option<Chunks> {
let reader = self.stream.reader()?;
Some(Chunks::new(reader, self.span, self.signals, self.type_))
}
pub fn into_source(self) -> ByteStreamSource {
self.stream
}
pub fn into_stdio(mut self) -> Result<Stdio, Self> {
match self.stream {
ByteStreamSource::Read(..) => Err(self),
ByteStreamSource::File(file) => Ok(file.into()),
#[cfg(feature = "os")]
ByteStreamSource::Child(child) => {
if let ChildProcess {
stdout: Some(ChildPipe::Pipe(stdout)),
stderr,
..
} = *child
{
debug_assert!(stderr.is_none(), "stderr should not exist");
Ok(stdout.into())
} else {
self.stream = ByteStreamSource::Child(child);
Err(self)
}
}
}
}
#[cfg(feature = "os")]
pub fn into_child(self) -> Result<ChildProcess, Self> {
if let ByteStreamSource::Child(child) = self.stream {
Ok(*child)
} else {
Err(self)
}
}
pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
let from_io_error = IoError::factory(self.span, None);
match self.stream {
ByteStreamSource::Read(mut read) => {
let mut buf = Vec::new();
read.read_to_end(&mut buf).map_err(|err| {
match ShellErrorBridge::try_from(err) {
Ok(ShellErrorBridge(err)) => err,
Err(err) => ShellError::Io(from_io_error(err)),
}
})?;
Ok(buf)
}
ByteStreamSource::File(mut file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf).map_err(&from_io_error)?;
Ok(buf)
}
#[cfg(feature = "os")]
ByteStreamSource::Child(child) => child.into_bytes(),
}
}
pub fn into_string(self) -> Result<String, ShellError> {
let span = self.span;
if self.type_.is_string_coercible() {
let trim = self.stream.is_external();
let bytes = self.into_bytes()?;
let mut string = String::from_utf8(bytes).map_err(|err| ShellError::NonUtf8Custom {
span,
msg: err.to_string(),
})?;
if trim {
trim_end_newline(&mut string);
}
Ok(string)
} else {
Err(ShellError::TypeMismatch {
err_message: "expected string, but got binary".into(),
span,
})
}
}
pub fn into_value(self) -> Result<Value, ShellError> {
let span = self.span;
let trim = self.stream.is_external();
let value = match self.type_ {
ByteStreamType::Binary => Value::binary(self.into_bytes()?, span),
ByteStreamType::String => Value::string(self.into_string()?, span),
ByteStreamType::Unknown => match String::from_utf8(self.into_bytes()?) {
Ok(mut str) => {
if trim {
trim_end_newline(&mut str);
}
Value::string(str, span)
}
Err(err) => Value::binary(err.into_bytes(), span),
},
};
Ok(value)
}
pub fn drain(self) -> Result<(), ShellError> {
match self.stream {
ByteStreamSource::Read(read) => {
copy_with_signals(read, io::sink(), self.span, &self.signals)?;
Ok(())
}
ByteStreamSource::File(_) => Ok(()),
#[cfg(feature = "os")]
ByteStreamSource::Child(child) => child.wait(),
}
}
pub fn print(self, to_stderr: bool) -> Result<(), ShellError> {
if to_stderr {
self.write_to(&mut io::stderr())
} else {
self.write_to(&mut io::stdout())
}
}
pub fn write_to(self, dest: impl Write) -> Result<(), ShellError> {
let span = self.span;
let signals = &self.signals;
match self.stream {
ByteStreamSource::Read(read) => {
copy_with_signals(read, dest, span, signals)?;
}
ByteStreamSource::File(file) => {
copy_with_signals(file, dest, span, signals)?;
}
#[cfg(feature = "os")]
ByteStreamSource::Child(mut child) => {
debug_assert!(child.stderr.is_none(), "stderr should not exist");
if let Some(stdout) = child.stdout.take() {
match stdout {
ChildPipe::Pipe(pipe) => {
copy_with_signals(pipe, dest, span, signals)?;
}
ChildPipe::Tee(tee) => {
copy_with_signals(tee, dest, span, signals)?;
}
}
}
child.wait()?;
}
}
Ok(())
}
}
impl From<ByteStream> for PipelineData {
fn from(stream: ByteStream) -> Self {
Self::byte_stream(stream, None)
}
}
struct ReadIterator<I>
where
I: Iterator,
I::Item: AsRef<[u8]>,
{
iter: I,
cursor: Option<Cursor<I::Item>>,
}
impl<I> Read for ReadIterator<I>
where
I: Iterator,
I::Item: AsRef<[u8]>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
while let Some(cursor) = self.cursor.as_mut() {
let read = cursor.read(buf)?;
if read == 0 {
self.cursor = self.iter.next().map(Cursor::new);
} else {
return Ok(read);
}
}
Ok(0)
}
}
struct ReadResultIterator<I, T>
where
I: Iterator<Item = Result<T, ShellError>>,
T: AsRef<[u8]>,
{
iter: I,
cursor: Option<Cursor<T>>,
}
impl<I, T> Read for ReadResultIterator<I, T>
where
I: Iterator<Item = Result<T, ShellError>>,
T: AsRef<[u8]>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
while let Some(cursor) = self.cursor.as_mut() {
let read = cursor.read(buf)?;
if read == 0 {
self.cursor = self
.iter
.next()
.transpose()
.map_err(ShellErrorBridge)?
.map(Cursor::new);
} else {
return Ok(read);
}
}
Ok(0)
}
}
pub struct Reader {
reader: BufReader<SourceReader>,
span: Span,
signals: Signals,
}
impl Reader {
pub fn span(&self) -> Span {
self.span
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.signals.check(&self.span).map_err(ShellErrorBridge)?;
self.reader.read(buf)
}
}
impl BufRead for Reader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.reader.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.reader.consume(amt)
}
}
pub struct Lines {
reader: BufReader<SourceReader>,
span: Span,
signals: Signals,
}
impl Lines {
pub fn span(&self) -> Span {
self.span
}
}
impl Iterator for Lines {
type Item = Result<String, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if self.signals.interrupted() {
None
} else {
let mut buf = Vec::new();
match self.reader.read_until(b'\n', &mut buf) {
Ok(0) => None,
Ok(_) => {
let Ok(mut string) = String::from_utf8(buf) else {
return Some(Err(ShellError::NonUtf8 { span: self.span }));
};
trim_end_newline(&mut string);
Some(Ok(string))
}
Err(err) => Some(Err(IoError::new(err, self.span, None).into())),
}
}
}
}
pub struct SplitRead {
internal: SplitReadInner<BufReader<SourceReader>>,
span: Span,
signals: Signals,
}
impl SplitRead {
fn new(
reader: SourceReader,
delimiter: impl AsRef<[u8]>,
span: Span,
signals: Signals,
) -> Self {
Self {
internal: SplitReadInner::new(BufReader::new(reader), delimiter),
span,
signals,
}
}
pub fn span(&self) -> Span {
self.span
}
}
impl Iterator for SplitRead {
type Item = Result<Vec<u8>, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if self.signals.interrupted() {
return None;
}
self.internal.next().map(|r| {
r.map_err(|err| {
ShellError::Io(IoError::new_internal(
err,
"Could not get next value for SplitRead",
))
})
})
}
}
pub struct Chunks {
reader: BufReader<SourceReader>,
pos: u64,
error: bool,
span: Span,
signals: Signals,
type_: ByteStreamType,
}
impl Chunks {
fn new(reader: SourceReader, span: Span, signals: Signals, type_: ByteStreamType) -> Self {
Self {
reader: BufReader::new(reader),
pos: 0,
error: false,
span,
signals,
type_,
}
}
pub fn span(&self) -> Span {
self.span
}
fn next_string(&mut self) -> Result<Option<String>, (Vec<u8>, ShellError)> {
let from_io_error = |err: std::io::Error| match ShellErrorBridge::try_from(err) {
Ok(err) => err.0,
Err(err) => IoError::new(err, self.span, None).into(),
};
let buf = self
.reader
.fill_buf()
.map_err(from_io_error)
.map_err(|err| (vec![], err))?;
if buf.is_empty() {
return Ok(None);
}
let mut buf = buf.to_vec();
let mut consumed = 0;
if buf.len() < 4 {
consumed += buf.len();
self.reader.consume(buf.len());
match self.reader.fill_buf() {
Ok(more_bytes) => buf.extend_from_slice(more_bytes),
Err(err) => return Err((buf, from_io_error(err))),
}
}
match String::from_utf8(buf) {
Ok(string) => {
self.reader.consume(string.len() - consumed);
self.pos += string.len() as u64;
Ok(Some(string))
}
Err(err) if err.utf8_error().error_len().is_none() => {
let valid_up_to = err.utf8_error().valid_up_to();
if valid_up_to > consumed {
self.reader.consume(valid_up_to - consumed);
}
let mut buf = err.into_bytes();
buf.truncate(valid_up_to);
buf.shrink_to_fit();
let string = String::from_utf8(buf)
.expect("failed to parse utf-8 even after correcting error");
self.pos += string.len() as u64;
Ok(Some(string))
}
Err(err) => {
let shell_error = ShellError::NonUtf8Custom {
msg: format!("invalid utf-8 sequence starting at index {}", self.pos),
span: self.span,
};
let buf = err.into_bytes();
if buf.len() > consumed {
self.reader.consume(buf.len() - consumed);
}
self.pos += buf.len() as u64;
Err((buf, shell_error))
}
}
}
}
impl Iterator for Chunks {
type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if self.error || self.signals.interrupted() {
None
} else {
match self.type_ {
ByteStreamType::Binary => {
let buf = match self.reader.fill_buf() {
Ok(buf) => buf,
Err(err) => {
self.error = true;
return Some(Err(ShellError::Io(IoError::new(err, self.span, None))));
}
};
if !buf.is_empty() {
let len = buf.len();
let value = Value::binary(buf, self.span);
self.reader.consume(len);
self.pos += len as u64;
Some(Ok(value))
} else {
None
}
}
ByteStreamType::String => match self.next_string().transpose()? {
Ok(string) => Some(Ok(Value::string(string, self.span))),
Err((_, err)) => {
self.error = true;
Some(Err(err))
}
},
ByteStreamType::Unknown => {
match self.next_string().transpose()? {
Ok(string) => Some(Ok(Value::string(string, self.span))),
Err((buf, _)) if !buf.is_empty() => {
self.type_ = ByteStreamType::Binary;
Some(Ok(Value::binary(buf, self.span)))
}
Err((_, err)) => {
self.error = true;
Some(Err(err))
}
}
}
}
}
}
}
fn trim_end_newline(string: &mut String) {
if string.ends_with('\n') {
string.pop();
if string.ends_with('\r') {
string.pop();
}
}
}
#[cfg(unix)]
pub(crate) fn convert_file<T: From<OwnedFd>>(file: impl Into<OwnedFd>) -> T {
file.into().into()
}
#[cfg(windows)]
pub(crate) fn convert_file<T: From<OwnedHandle>>(file: impl Into<OwnedHandle>) -> T {
file.into().into()
}
const DEFAULT_BUF_SIZE: usize = 8192;
pub fn copy_with_signals(
mut reader: impl Read,
mut writer: impl Write,
span: Span,
signals: &Signals,
) -> Result<u64, ShellError> {
let from_io_error = IoError::factory(span, None);
if signals.is_empty() {
match io::copy(&mut reader, &mut writer) {
Ok(n) => {
writer.flush().map_err(&from_io_error)?;
Ok(n)
}
Err(err) => {
let _ = writer.flush();
match ShellErrorBridge::try_from(err) {
Ok(ShellErrorBridge(shell_error)) => Err(shell_error),
Err(err) => Err(from_io_error(err).into()),
}
}
}
} else {
match generic_copy(&mut reader, &mut writer, span, signals) {
Ok(len) => {
writer.flush().map_err(&from_io_error)?;
Ok(len)
}
Err(err) => {
let _ = writer.flush();
Err(err)
}
}
}
}
fn generic_copy(
mut reader: impl Read,
mut writer: impl Write,
span: Span,
signals: &Signals,
) -> Result<u64, ShellError> {
let from_io_error = IoError::factory(span, None);
let buf = &mut [0; DEFAULT_BUF_SIZE];
let mut len = 0;
loop {
signals.check(&span)?;
let n = match reader.read(buf) {
Ok(0) => break,
Ok(n) => n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => match ShellErrorBridge::try_from(e) {
Ok(ShellErrorBridge(e)) => return Err(e),
Err(e) => return Err(from_io_error(e).into()),
},
};
len += n;
writer.write_all(&buf[..n]).map_err(&from_io_error)?;
}
Ok(len as u64)
}
struct ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
buffer: Cursor<Vec<u8>>,
generator: F,
}
impl<F> BufRead for ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
while self.buffer.fill_buf()?.is_empty() {
self.buffer.set_position(0);
self.buffer.get_mut().clear();
if !(self.generator)(self.buffer.get_mut()).map_err(ShellErrorBridge)? {
break;
}
}
self.buffer.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.buffer.consume(amt);
}
}
impl<F> Read for ReadGenerator<F>
where
F: FnMut(&mut Vec<u8>) -> Result<bool, ShellError> + Send + 'static,
{
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let slice = self.fill_buf()?;
let len = buf.len().min(slice.len());
buf[..len].copy_from_slice(&slice[..len]);
self.consume(len);
Ok(len)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_chunks<T>(data: Vec<T>, type_: ByteStreamType) -> Chunks
where
T: AsRef<[u8]> + Default + Send + 'static,
{
let reader = ReadIterator {
iter: data.into_iter(),
cursor: Some(Cursor::new(T::default())),
};
Chunks::new(
SourceReader::Read(Box::new(reader)),
Span::test_data(),
Signals::empty(),
type_,
)
}
#[test]
fn chunks_read_binary_passthrough() {
let bins = vec![&[0, 1][..], &[2, 3][..]];
let iter = test_chunks(bins.clone(), ByteStreamType::Binary);
let bins_values: Vec<Value> = bins
.into_iter()
.map(|bin| Value::binary(bin, Span::test_data()))
.collect();
assert_eq!(
bins_values,
iter.collect::<Result<Vec<Value>, _>>().expect("error")
);
}
#[test]
fn chunks_read_string_clean() {
let strs = vec!["Nushell", "が好きです"];
let iter = test_chunks(strs.clone(), ByteStreamType::String);
let strs_values: Vec<Value> = strs
.into_iter()
.map(|string| Value::string(string, Span::test_data()))
.collect();
assert_eq!(
strs_values,
iter.collect::<Result<Vec<Value>, _>>().expect("error")
);
}
#[test]
fn chunks_read_string_split_boundary() {
let real = "Nushell最高!";
let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab\x98!"[..]];
let iter = test_chunks(chunks.clone(), ByteStreamType::String);
let mut string = String::new();
for value in iter {
let chunk_string = value.expect("error").into_string().expect("not a string");
string.push_str(&chunk_string);
}
assert_eq!(real, string);
}
#[test]
fn chunks_read_string_utf8_error() {
let chunks = vec![&b"Nushell\xe6"[..], &b"\x9c\x80\xe9"[..], &b"\xab"[..]];
let iter = test_chunks(chunks, ByteStreamType::String);
let mut string = String::new();
for value in iter {
match value {
Ok(value) => string.push_str(&value.into_string().expect("not a string")),
Err(err) => {
println!("string so far: {string:?}");
println!("got error: {err:?}");
assert!(!string.is_empty());
assert!(matches!(err, ShellError::NonUtf8Custom { .. }));
return;
}
}
}
panic!("no error");
}
#[test]
fn chunks_read_unknown_fallback() {
let chunks = vec![&b"Nushell"[..], &b"\x9c\x80\xe9abcd"[..], &b"efgh"[..]];
let mut iter = test_chunks(chunks, ByteStreamType::Unknown);
let mut get = || iter.next().expect("end of iter").expect("error");
assert_eq!(Value::test_string("Nushell"), get());
assert_eq!(Value::test_binary(b"\x9c\x80\xe9abcd"), get());
assert_eq!(Value::test_binary(b"efgh"), get());
}
}