use crate::{
ast::{Call, PathMember},
engine::{EngineState, Stack},
ByteStream, ByteStreamType, Config, ListStream, OutDest, PipelineMetadata, Range, ShellError,
Signals, Span, Type, Value,
};
use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush};
use std::io::Write;
const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
#[derive(Debug)]
pub enum PipelineData {
Empty,
Value(Value, Option<PipelineMetadata>),
ListStream(ListStream, Option<PipelineMetadata>),
ByteStream(ByteStream, Option<PipelineMetadata>),
}
impl PipelineData {
pub fn empty() -> PipelineData {
PipelineData::Empty
}
pub fn metadata(&self) -> Option<PipelineMetadata> {
match self {
PipelineData::Empty => None,
PipelineData::Value(_, meta)
| PipelineData::ListStream(_, meta)
| PipelineData::ByteStream(_, meta) => meta.clone(),
}
}
pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
match &mut self {
PipelineData::Empty => {}
PipelineData::Value(_, meta)
| PipelineData::ListStream(_, meta)
| PipelineData::ByteStream(_, meta) => *meta = metadata,
}
self
}
pub fn is_nothing(&self) -> bool {
matches!(self, PipelineData::Value(Value::Nothing { .. }, ..))
|| matches!(self, PipelineData::Empty)
}
pub fn span(&self) -> Option<Span> {
match self {
PipelineData::Empty => None,
PipelineData::Value(value, ..) => Some(value.span()),
PipelineData::ListStream(stream, ..) => Some(stream.span()),
PipelineData::ByteStream(stream, ..) => Some(stream.span()),
}
}
pub fn with_span(self, span: Span) -> Self {
match self {
PipelineData::Empty => PipelineData::Value(Value::nothing(span), None),
PipelineData::Value(value, metadata) => {
PipelineData::Value(value.with_span(span), metadata)
}
PipelineData::ListStream(stream, metadata) => {
PipelineData::ListStream(stream.with_span(span), metadata)
}
PipelineData::ByteStream(stream, metadata) => {
PipelineData::ByteStream(stream.with_span(span), metadata)
}
}
}
pub fn get_type(&self) -> Type {
match self {
PipelineData::Empty => Type::Nothing,
PipelineData::Value(value, _) => value.get_type(),
PipelineData::ListStream(_, _) => Type::ListStream,
PipelineData::ByteStream(stream, _) => stream.type_().into(),
}
}
pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
match self {
PipelineData::Empty => Ok(Value::nothing(span)),
PipelineData::Value(value, ..) => Ok(value.with_span(span)),
PipelineData::ListStream(stream, ..) => Ok(stream.into_value()),
PipelineData::ByteStream(stream, ..) => stream.into_value(),
}
}
pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
let span = self.span().unwrap_or(Span::unknown());
match self {
PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
let metadata = metadata.clone();
Ok(PipelineData::ListStream(
ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::String { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_string(val, span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_binary(val, span, engine_state.signals().clone()),
metadata,
))
}
_ => Err(self),
}
}
pub fn write_to_out_dests(
self,
engine_state: &EngineState,
stack: &mut Stack,
) -> Result<PipelineData, ShellError> {
match (self, stack.stdout()) {
(data, OutDest::Pipe | OutDest::Capture) => return Ok(data),
(PipelineData::ByteStream(stream, ..), stdout) => {
stream.write_to_out_dests(stdout, stack.stderr())?;
}
(PipelineData::Empty, ..) => {}
(PipelineData::Value(..), OutDest::Null) => {}
(PipelineData::ListStream(stream, ..), OutDest::Null) => {
stream.drain()?;
}
(PipelineData::Value(value, ..), OutDest::File(file)) => {
let bytes = value_to_bytes(value)?;
let mut file = file.as_ref();
file.write_all(&bytes)?;
file.flush()?;
}
(PipelineData::ListStream(stream, ..), OutDest::File(file)) => {
let mut file = file.as_ref();
for value in stream {
let bytes = value_to_bytes(value)?;
file.write_all(&bytes)?;
file.write_all(b"\n")?;
}
file.flush()?;
}
(data @ (PipelineData::Value(..) | PipelineData::ListStream(..)), OutDest::Inherit) => {
data.print(engine_state, stack, false, false)?;
}
}
Ok(PipelineData::Empty)
}
pub fn drain(self) -> Result<(), ShellError> {
match self {
PipelineData::Empty => Ok(()),
PipelineData::Value(Value::Error { error, .. }, ..) => Err(*error),
PipelineData::Value(..) => Ok(()),
PipelineData::ListStream(stream, ..) => stream.drain(),
PipelineData::ByteStream(stream, ..) => stream.drain(),
}
}
pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
Ok(PipelineIterator(match self {
PipelineData::Value(value, ..) => {
let val_span = value.span();
match value {
Value::List { vals, .. } => PipelineIteratorInner::ListStream(
ListStream::new(vals.into_iter(), val_span, Signals::empty()).into_iter(),
),
Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(
val.into_iter().map(move |x| Value::int(x as i64, val_span)),
val_span,
Signals::empty(),
)
.into_iter(),
),
Value::Range { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(
val.into_range_iter(val_span, Signals::empty()),
val_span,
Signals::empty(),
)
.into_iter(),
),
Value::Error { error, .. } => return Err(*error),
other => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary, range, or byte stream".into(),
wrong_type: other.get_type().to_string(),
dst_span: span,
src_span: val_span,
})
}
}
}
PipelineData::ListStream(stream, ..) => {
PipelineIteratorInner::ListStream(stream.into_iter())
}
PipelineData::Empty => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary, range, or byte stream".into(),
wrong_type: "null".into(),
dst_span: span,
src_span: span,
})
}
PipelineData::ByteStream(stream, ..) => {
if let Some(chunks) = stream.chunks() {
PipelineIteratorInner::ByteStream(chunks)
} else {
PipelineIteratorInner::Empty
}
}
}))
}
pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
match self {
PipelineData::Empty => Ok(String::new()),
PipelineData::Value(value, ..) => Ok(value.to_expanded_string(separator, config)),
PipelineData::ListStream(stream, ..) => Ok(stream.into_string(separator, config)),
PipelineData::ByteStream(stream, ..) => stream.into_string(),
}
}
pub fn collect_string_strict(
self,
span: Span,
) -> Result<(String, Span, Option<PipelineMetadata>), ShellError> {
match self {
PipelineData::Empty => Ok((String::new(), span, None)),
PipelineData::Value(Value::String { val, .. }, metadata) => Ok((val, span, metadata)),
PipelineData::Value(val, ..) => Err(ShellError::TypeMismatch {
err_message: "string".into(),
span: val.span(),
}),
PipelineData::ListStream(..) => Err(ShellError::TypeMismatch {
err_message: "string".into(),
span,
}),
PipelineData::ByteStream(stream, metadata) => {
let span = stream.span();
Ok((stream.into_string()?, span, metadata))
}
}
}
pub fn follow_cell_path(
self,
cell_path: &[PathMember],
head: Span,
insensitive: bool,
) -> Result<Value, ShellError> {
match self {
PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
.follow_cell_path(cell_path, insensitive),
PipelineData::Value(v, ..) => v.follow_cell_path(cell_path, insensitive),
PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
type_name: "empty pipeline".to_string(),
span: head,
}),
PipelineData::ByteStream(stream, ..) => Err(ShellError::IncompatiblePathAccess {
type_name: stream.type_().describe().to_owned(),
span: stream.span(),
}),
}
}
pub fn map<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
where
Self: Sized,
F: FnMut(Value) -> Value + 'static + Send,
{
match self {
PipelineData::Value(value, metadata) => {
let span = value.span();
let pipeline = match value {
Value::List { vals, .. } => vals
.into_iter()
.map(f)
.into_pipeline_data(span, signals.clone()),
Value::Range { val, .. } => val
.into_range_iter(span, Signals::empty())
.map(f)
.into_pipeline_data(span, signals.clone()),
value => match f(value) {
Value::Error { error, .. } => return Err(*error),
v => v.into_pipeline_data(),
},
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::ListStream(stream, metadata) => {
Ok(PipelineData::ListStream(stream.map(f), metadata))
}
PipelineData::ByteStream(stream, metadata) => {
Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
}
}
}
pub fn flat_map<U, F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
where
Self: Sized,
U: IntoIterator<Item = Value> + 'static,
<U as IntoIterator>::IntoIter: 'static + Send,
F: FnMut(Value) -> U + 'static + Send,
{
match self {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(value, metadata) => {
let span = value.span();
let pipeline = match value {
Value::List { vals, .. } => vals
.into_iter()
.flat_map(f)
.into_pipeline_data(span, signals.clone()),
Value::Range { val, .. } => val
.into_range_iter(span, Signals::empty())
.flat_map(f)
.into_pipeline_data(span, signals.clone()),
value => f(value)
.into_iter()
.into_pipeline_data(span, signals.clone()),
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
stream.modify(|iter| iter.flat_map(f)),
metadata,
)),
PipelineData::ByteStream(stream, metadata) => {
let span = stream.span();
let iter = match String::from_utf8(stream.into_bytes()?) {
Ok(mut str) => {
str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
f(Value::string(str, span))
}
Err(err) => f(Value::binary(err.into_bytes(), span)),
};
Ok(iter.into_iter().into_pipeline_data_with_metadata(
span,
signals.clone(),
metadata,
))
}
}
}
pub fn filter<F>(self, mut f: F, signals: &Signals) -> Result<PipelineData, ShellError>
where
Self: Sized,
F: FnMut(&Value) -> bool + 'static + Send,
{
match self {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(value, metadata) => {
let span = value.span();
let pipeline = match value {
Value::List { vals, .. } => vals
.into_iter()
.filter(f)
.into_pipeline_data(span, signals.clone()),
Value::Range { val, .. } => val
.into_range_iter(span, Signals::empty())
.filter(f)
.into_pipeline_data(span, signals.clone()),
value => {
if f(&value) {
value.into_pipeline_data()
} else {
Value::nothing(span).into_pipeline_data()
}
}
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
stream.modify(|iter| iter.filter(f)),
metadata,
)),
PipelineData::ByteStream(stream, metadata) => {
let span = stream.span();
let value = match String::from_utf8(stream.into_bytes()?) {
Ok(mut str) => {
str.truncate(str.trim_end_matches(LINE_ENDING_PATTERN).len());
Value::string(str, span)
}
Err(err) => Value::binary(err.into_bytes(), span),
};
let value = if f(&value) {
value
} else {
Value::nothing(span)
};
Ok(value.into_pipeline_data_with_metadata(metadata))
}
}
}
pub fn try_expand_range(self) -> Result<PipelineData, ShellError> {
match self {
PipelineData::Value(v, metadata) => {
let span = v.span();
match v {
Value::Range { val, .. } => {
match *val {
Range::IntRange(range) => {
if range.is_unbounded() {
return Err(ShellError::GenericError {
error: "Cannot create range".into(),
msg: "Unbounded ranges are not allowed when converting to this format".into(),
span: Some(span),
help: Some("Consider using ranges with valid start and end point.".into()),
inner: vec![],
});
}
}
Range::FloatRange(range) => {
if range.is_unbounded() {
return Err(ShellError::GenericError {
error: "Cannot create range".into(),
msg: "Unbounded ranges are not allowed when converting to this format".into(),
span: Some(span),
help: Some("Consider using ranges with valid start and end point.".into()),
inner: vec![],
});
}
}
}
let range_values: Vec<Value> =
val.into_range_iter(span, Signals::empty()).collect();
Ok(PipelineData::Value(Value::list(range_values, span), None))
}
x => Ok(PipelineData::Value(x, metadata)),
}
}
_ => Ok(self),
}
}
pub fn print(
self,
engine_state: &EngineState,
stack: &mut Stack,
no_newline: bool,
to_stderr: bool,
) -> Result<(), ShellError> {
match self {
PipelineData::ByteStream(stream, ..) if stream.type_() != ByteStreamType::Binary => {
stream.print(to_stderr)
}
_ => {
if let Some(decl_id) = engine_state.table_decl_id {
let command = engine_state.get_decl(decl_id);
if command.block_id().is_some() {
self.write_all_and_flush(engine_state, no_newline, to_stderr)
} else {
let call = Call::new(Span::new(0, 0));
let table = command.run(engine_state, stack, &(&call).into(), self)?;
table.write_all_and_flush(engine_state, no_newline, to_stderr)
}
} else {
self.write_all_and_flush(engine_state, no_newline, to_stderr)
}
}
}
}
pub fn print_raw(
self,
engine_state: &EngineState,
no_newline: bool,
to_stderr: bool,
) -> Result<(), ShellError> {
if let PipelineData::Value(Value::Binary { val: bytes, .. }, _) = self {
if to_stderr {
stderr_write_all_and_flush(bytes)?;
} else {
stdout_write_all_and_flush(bytes)?;
}
Ok(())
} else {
self.write_all_and_flush(engine_state, no_newline, to_stderr)
}
}
fn write_all_and_flush(
self,
engine_state: &EngineState,
no_newline: bool,
to_stderr: bool,
) -> Result<(), ShellError> {
if let PipelineData::ByteStream(stream, ..) = self {
stream.print(to_stderr)
} else {
let config = engine_state.get_config();
for item in self {
let mut out = if let Value::Error { error, .. } = item {
return Err(*error);
} else {
item.to_expanded_string("\n", config)
};
if !no_newline {
out.push('\n');
}
if to_stderr {
stderr_write_all_and_flush(out)?
} else {
stdout_write_all_and_flush(out)?
}
}
Ok(())
}
}
pub fn unsupported_input_error(
self,
expected_type: impl Into<String>,
span: Span,
) -> ShellError {
match self {
PipelineData::Empty => ShellError::PipelineEmpty { dst_span: span },
PipelineData::Value(value, ..) => ShellError::OnlySupportsThisInputType {
exp_input_type: expected_type.into(),
wrong_type: value.get_type().get_non_specified_string(),
dst_span: span,
src_span: value.span(),
},
PipelineData::ListStream(stream, ..) => ShellError::OnlySupportsThisInputType {
exp_input_type: expected_type.into(),
wrong_type: "list (stream)".into(),
dst_span: span,
src_span: stream.span(),
},
PipelineData::ByteStream(stream, ..) => ShellError::OnlySupportsThisInputType {
exp_input_type: expected_type.into(),
wrong_type: stream.type_().describe().into(),
dst_span: span,
src_span: stream.span(),
},
}
}
}
enum PipelineIteratorInner {
Empty,
Value(Value),
ListStream(crate::list_stream::IntoIter),
ByteStream(crate::byte_stream::Chunks),
}
pub struct PipelineIterator(PipelineIteratorInner);
impl IntoIterator for PipelineData {
type Item = Value;
type IntoIter = PipelineIterator;
fn into_iter(self) -> Self::IntoIter {
PipelineIterator(match self {
PipelineData::Empty => PipelineIteratorInner::Empty,
PipelineData::Value(value, ..) => {
let span = value.span();
match value {
Value::List { vals, .. } => PipelineIteratorInner::ListStream(
ListStream::new(vals.into_iter(), span, Signals::empty()).into_iter(),
),
Value::Range { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(
val.into_range_iter(span, Signals::empty()),
span,
Signals::empty(),
)
.into_iter(),
),
x => PipelineIteratorInner::Value(x),
}
}
PipelineData::ListStream(stream, ..) => {
PipelineIteratorInner::ListStream(stream.into_iter())
}
PipelineData::ByteStream(stream, ..) => stream.chunks().map_or(
PipelineIteratorInner::Empty,
PipelineIteratorInner::ByteStream,
),
})
}
}
impl Iterator for PipelineIterator {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
match &mut self.0 {
PipelineIteratorInner::Empty => None,
PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
PipelineIteratorInner::ByteStream(stream) => stream.next().map(|x| match x {
Ok(x) => x,
Err(err) => Value::error(
err,
Span::unknown(), ),
}),
}
}
}
pub trait IntoPipelineData {
fn into_pipeline_data(self) -> PipelineData;
fn into_pipeline_data_with_metadata(
self,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData;
}
impl<V> IntoPipelineData for V
where
V: Into<Value>,
{
fn into_pipeline_data(self) -> PipelineData {
PipelineData::Value(self.into(), None)
}
fn into_pipeline_data_with_metadata(
self,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData {
PipelineData::Value(self.into(), metadata.into())
}
}
pub trait IntoInterruptiblePipelineData {
fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData;
fn into_pipeline_data_with_metadata(
self,
span: Span,
signals: Signals,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData;
}
impl<I> IntoInterruptiblePipelineData for I
where
I: IntoIterator + Send + 'static,
I::IntoIter: Send + 'static,
<I::IntoIter as Iterator>::Item: Into<Value>,
{
fn into_pipeline_data(self, span: Span, signals: Signals) -> PipelineData {
ListStream::new(self.into_iter().map(Into::into), span, signals).into()
}
fn into_pipeline_data_with_metadata(
self,
span: Span,
signals: Signals,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData {
PipelineData::ListStream(
ListStream::new(self.into_iter().map(Into::into), span, signals),
metadata.into(),
)
}
}
fn value_to_bytes(value: Value) -> Result<Vec<u8>, ShellError> {
let bytes = match value {
Value::String { val, .. } => val.into_bytes(),
Value::Binary { val, .. } => val,
Value::List { vals, .. } => {
let val = vals
.into_iter()
.map(Value::coerce_into_string)
.collect::<Result<Vec<String>, ShellError>>()?
.join("\n")
+ "\n";
val.into_bytes()
}
Value::Error { error, .. } => return Err(*error),
value => value.coerce_into_string()?.into_bytes(),
};
Ok(bytes)
}