use std::{
io::Write,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex,
},
thread,
};
use nu_protocol::{ListStream, PipelineData, RawStream, ShellError};
use crate::{
plugin::Encoder,
protocol::{
ExternalStreamInfo, ListStreamInfo, PipelineDataHeader, RawStreamInfo, StreamMessage,
},
sequence::Sequence,
};
mod stream;
mod engine;
pub(crate) use engine::{EngineInterfaceManager, ReceivedPluginCall};
mod plugin;
pub(crate) use plugin::{PluginInterface, PluginInterfaceManager};
use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage};
#[cfg(test)]
mod test_util;
#[cfg(test)]
mod tests;
const LIST_STREAM_HIGH_PRESSURE: i32 = 100;
const RAW_STREAM_HIGH_PRESSURE: i32 = 50;
pub(crate) trait PluginRead<T> {
fn read(&mut self) -> Result<Option<T>, ShellError>;
}
impl<R, E, T> PluginRead<T> for (R, E)
where
R: std::io::BufRead,
E: Encoder<T>,
{
fn read(&mut self) -> Result<Option<T>, ShellError> {
self.1.decode(&mut self.0)
}
}
impl<R, T> PluginRead<T> for &mut R
where
R: PluginRead<T>,
{
fn read(&mut self) -> Result<Option<T>, ShellError> {
(**self).read()
}
}
pub(crate) trait PluginWrite<T>: Send + Sync {
fn write(&self, data: &T) -> Result<(), ShellError>;
fn flush(&self) -> Result<(), ShellError>;
}
impl<E, T> PluginWrite<T> for (std::io::Stdout, E)
where
E: Encoder<T>,
{
fn write(&self, data: &T) -> Result<(), ShellError> {
let mut lock = self.0.lock();
self.1.encode(data, &mut lock)
}
fn flush(&self) -> Result<(), ShellError> {
self.0.lock().flush().map_err(|err| ShellError::IOError {
msg: err.to_string(),
})
}
}
impl<W, E, T> PluginWrite<T> for (Mutex<W>, E)
where
W: std::io::Write + Send,
E: Encoder<T>,
{
fn write(&self, data: &T) -> Result<(), ShellError> {
let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
msg: "writer mutex poisoned".into(),
})?;
self.1.encode(data, &mut *lock)
}
fn flush(&self) -> Result<(), ShellError> {
let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
msg: "writer mutex poisoned".into(),
})?;
lock.flush().map_err(|err| ShellError::IOError {
msg: err.to_string(),
})
}
}
impl<W, T> PluginWrite<T> for &W
where
W: PluginWrite<T>,
{
fn write(&self, data: &T) -> Result<(), ShellError> {
(**self).write(data)
}
fn flush(&self) -> Result<(), ShellError> {
(**self).flush()
}
}
pub(crate) trait InterfaceManager {
type Interface: Interface + 'static;
type Input;
fn get_interface(&self) -> Self::Interface;
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError>;
fn stream_manager(&self) -> &StreamManager;
fn prepare_pipeline_data(&self, data: PipelineData) -> Result<PipelineData, ShellError>;
fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
self.stream_manager().handle_message(message)
}
fn read_pipeline_data(
&self,
header: PipelineDataHeader,
ctrlc: Option<&Arc<AtomicBool>>,
) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header {
PipelineDataHeader::Empty => PipelineData::Empty,
PipelineDataHeader::Value(value) => PipelineData::Value(value, None),
PipelineDataHeader::ListStream(info) => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
PipelineData::ListStream(ListStream::from_stream(reader, ctrlc.cloned()), None)
}
PipelineDataHeader::ExternalStream(info) => {
let handle = self.stream_manager().get_handle();
let span = info.span;
let new_raw_stream = |raw_info: RawStreamInfo| {
let reader = handle.read_stream(raw_info.id, self.get_interface())?;
let mut stream =
RawStream::new(Box::new(reader), ctrlc.cloned(), span, raw_info.known_size);
stream.is_binary = raw_info.is_binary;
Ok::<_, ShellError>(stream)
};
PipelineData::ExternalStream {
stdout: info.stdout.map(new_raw_stream).transpose()?,
stderr: info.stderr.map(new_raw_stream).transpose()?,
exit_code: info
.exit_code
.map(|list_info| {
handle
.read_stream(list_info.id, self.get_interface())
.map(|reader| ListStream::from_stream(reader, ctrlc.cloned()))
})
.transpose()?,
span: info.span,
metadata: None,
trim_end_newline: info.trim_end_newline,
}
}
})
}
}
pub(crate) trait Interface: Clone + Send {
type Output: From<StreamMessage>;
fn write(&self, output: Self::Output) -> Result<(), ShellError>;
fn flush(&self) -> Result<(), ShellError>;
fn stream_id_sequence(&self) -> &Sequence;
fn stream_manager_handle(&self) -> &StreamManagerHandle;
fn prepare_pipeline_data(&self, data: PipelineData) -> Result<PipelineData, ShellError>;
fn init_write_pipeline_data(
&self,
data: PipelineData,
) -> Result<(PipelineDataHeader, PipelineDataWriter<Self>), ShellError> {
let new_stream = |high_pressure_mark: i32| {
let id = self.stream_id_sequence().next()?;
let writer =
self.stream_manager_handle()
.write_stream(id, self.clone(), high_pressure_mark)?;
Ok::<_, ShellError>((id, writer))
};
match self.prepare_pipeline_data(data)? {
PipelineData::Value(value, _) => {
Ok((PipelineDataHeader::Value(value), PipelineDataWriter::None))
}
PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
PipelineData::ListStream(stream, _) => {
let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
Ok((
PipelineDataHeader::ListStream(ListStreamInfo { id }),
PipelineDataWriter::ListStream(writer, stream),
))
}
PipelineData::ExternalStream {
stdout,
stderr,
exit_code,
span,
metadata: _,
trim_end_newline,
} => {
let stdout_stream = stdout
.is_some()
.then(|| new_stream(RAW_STREAM_HIGH_PRESSURE))
.transpose()?;
let stderr_stream = stderr
.is_some()
.then(|| new_stream(RAW_STREAM_HIGH_PRESSURE))
.transpose()?;
let exit_code_stream = exit_code
.is_some()
.then(|| new_stream(LIST_STREAM_HIGH_PRESSURE))
.transpose()?;
let header = PipelineDataHeader::ExternalStream(ExternalStreamInfo {
span,
stdout: stdout
.as_ref()
.zip(stdout_stream.as_ref())
.map(|(stream, (id, _))| RawStreamInfo::new(*id, stream)),
stderr: stderr
.as_ref()
.zip(stderr_stream.as_ref())
.map(|(stream, (id, _))| RawStreamInfo::new(*id, stream)),
exit_code: exit_code_stream
.as_ref()
.map(|&(id, _)| ListStreamInfo { id }),
trim_end_newline,
});
let writer = PipelineDataWriter::ExternalStream {
stdout: stdout_stream.map(|(_, writer)| writer).zip(stdout),
stderr: stderr_stream.map(|(_, writer)| writer).zip(stderr),
exit_code: exit_code_stream.map(|(_, writer)| writer).zip(exit_code),
};
Ok((header, writer))
}
}
}
}
impl<T> WriteStreamMessage for T
where
T: Interface,
{
fn write_stream_message(&mut self, msg: StreamMessage) -> Result<(), ShellError> {
self.write(msg.into())
}
fn flush(&mut self) -> Result<(), ShellError> {
<Self as Interface>::flush(self)
}
}
#[derive(Default)]
#[must_use]
pub(crate) enum PipelineDataWriter<W: WriteStreamMessage> {
#[default]
None,
ListStream(StreamWriter<W>, ListStream),
ExternalStream {
stdout: Option<(StreamWriter<W>, RawStream)>,
stderr: Option<(StreamWriter<W>, RawStream)>,
exit_code: Option<(StreamWriter<W>, ListStream)>,
},
}
impl<W> PipelineDataWriter<W>
where
W: WriteStreamMessage + Send + 'static,
{
pub(crate) fn write(self) -> Result<(), ShellError> {
match self {
PipelineDataWriter::None => Ok(()),
PipelineDataWriter::ListStream(mut writer, stream) => {
writer.write_all(stream)?;
Ok(())
}
PipelineDataWriter::ExternalStream {
stdout,
stderr,
exit_code,
} => {
thread::scope(|scope| {
let stderr_thread = stderr
.map(|(mut writer, stream)| {
thread::Builder::new()
.name("plugin stderr writer".into())
.spawn_scoped(scope, move || {
writer.write_all(raw_stream_iter(stream))
})
})
.transpose()?;
let exit_code_thread = exit_code
.map(|(mut writer, stream)| {
thread::Builder::new()
.name("plugin exit_code writer".into())
.spawn_scoped(scope, move || writer.write_all(stream))
})
.transpose()?;
if let Some((mut writer, stream)) = stdout {
writer.write_all(raw_stream_iter(stream))?;
}
let panicked = |thread_name: &str| {
Err(ShellError::NushellFailed {
msg: format!(
"{thread_name} thread panicked in PipelineDataWriter::write"
),
})
};
stderr_thread
.map(|t| t.join().unwrap_or_else(|_| panicked("stderr")))
.transpose()?;
exit_code_thread
.map(|t| t.join().unwrap_or_else(|_| panicked("exit_code")))
.transpose()?;
Ok(())
})
}
}
}
pub(crate) fn write_background(
self,
) -> Result<Option<thread::JoinHandle<Result<(), ShellError>>>, ShellError> {
match self {
PipelineDataWriter::None => Ok(None),
_ => Ok(Some(
thread::Builder::new()
.name("plugin stream background writer".into())
.spawn(move || {
let result = self.write();
if let Err(ref err) = result {
log::warn!("Error while writing pipeline in background: {err}");
}
result
})?,
)),
}
}
}
fn raw_stream_iter(stream: RawStream) -> impl Iterator<Item = Result<Vec<u8>, ShellError>> {
let ctrlc = stream.ctrlc;
stream
.stream
.take_while(move |_| ctrlc.as_ref().map(|b| !b.load(Relaxed)).unwrap_or(true))
}