use std::sync::{mpsc, Arc};
use nu_protocol::{
IntoInterruptiblePipelineData, ListStream, PipelineData, PluginSignature, ShellError, Spanned,
Value,
};
use crate::{
protocol::{
CallInfo, CustomValueOp, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
PluginInput, ProtocolInfo,
},
LabeledError, PluginOutput,
};
use super::{
stream::{StreamManager, StreamManagerHandle},
Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite,
};
use crate::sequence::Sequence;
#[derive(Debug)]
pub(crate) enum ReceivedPluginCall {
Signature {
engine: EngineInterface,
},
Run {
engine: EngineInterface,
call: CallInfo<PipelineData>,
},
CustomValueOp {
engine: EngineInterface,
custom_value: Spanned<PluginCustomValue>,
op: CustomValueOp,
},
}
#[cfg(test)]
mod tests;
struct EngineInterfaceState {
stream_id_sequence: Sequence,
writer: Box<dyn PluginWrite<PluginOutput>>,
}
impl std::fmt::Debug for EngineInterfaceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineInterfaceState")
.field("stream_id_sequence", &self.stream_id_sequence)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub(crate) struct EngineInterfaceManager {
state: Arc<EngineInterfaceState>,
plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
stream_manager: StreamManager,
protocol_info: Option<ProtocolInfo>,
}
impl EngineInterfaceManager {
pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
let (plug_tx, plug_rx) = mpsc::channel();
EngineInterfaceManager {
state: Arc::new(EngineInterfaceState {
stream_id_sequence: Sequence::default(),
writer: Box::new(writer),
}),
plugin_call_sender: Some(plug_tx),
plugin_call_receiver: Some(plug_rx),
stream_manager: StreamManager::new(),
protocol_info: None,
}
}
pub(crate) fn take_plugin_call_receiver(
&mut self,
) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
self.plugin_call_receiver.take()
}
fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
EngineInterface {
state: self.state.clone(),
stream_manager_handle: self.stream_manager.get_handle(),
context: Some(context),
}
}
fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
self.plugin_call_sender
.as_ref()
.ok_or_else(|| ShellError::PluginFailedToDecode {
msg: "Received a plugin call after Goodbye".into(),
})?
.send(plugin_call)
.map_err(|_| ShellError::NushellFailed {
msg: "Received a plugin call, but there's nowhere to send it".into(),
})
}
pub(crate) fn is_finished(&self) -> bool {
Arc::strong_count(&self.state) < 2
}
pub(crate) fn consume_all(
&mut self,
mut reader: impl PluginRead<PluginInput>,
) -> Result<(), ShellError> {
while let Some(msg) = reader.read().transpose() {
if self.is_finished() {
break;
}
if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
let _ = self.stream_manager.broadcast_read_error(err.clone());
return Err(err);
}
}
Ok(())
}
}
impl InterfaceManager for EngineInterfaceManager {
type Interface = EngineInterface;
type Input = PluginInput;
fn get_interface(&self) -> Self::Interface {
EngineInterface {
state: self.state.clone(),
stream_manager_handle: self.stream_manager.get_handle(),
context: None,
}
}
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from engine: {:?}", input);
match input {
PluginInput::Hello(info) => {
let local_info = ProtocolInfo::default();
if local_info.is_compatible_with(&info)? {
self.protocol_info = Some(info);
Ok(())
} else {
self.protocol_info = None;
Err(ShellError::PluginFailedToLoad {
msg: format!(
"Plugin is compiled for nushell version {}, \
which is not compatible with version {}",
local_info.version, info.version
),
})
}
}
_ if self.protocol_info.is_none() => {
Err(ShellError::PluginFailedToLoad {
msg: "Failed to receive initial Hello message. This engine might be too old"
.into(),
})
}
PluginInput::Stream(message) => self.consume_stream_message(message),
PluginInput::Call(id, call) => match call {
PluginCall::Signature => self.send_plugin_call(ReceivedPluginCall::Signature {
engine: self.interface_for_context(id),
}),
PluginCall::Run(CallInfo {
name,
mut call,
input,
config,
}) => {
let interface = self.interface_for_context(id);
match self.read_pipeline_data(input, None) {
Ok(input) => {
if let Err(err) = deserialize_call_args(&mut call) {
return interface.write_response(Err(err))?.write();
}
self.send_plugin_call(ReceivedPluginCall::Run {
engine: interface,
call: CallInfo {
name,
call,
input,
config,
},
})
}
err @ Err(_) => interface.write_response(err)?.write(),
}
}
PluginCall::CustomValueOp(custom_value, op) => {
self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
engine: self.interface_for_context(id),
custom_value,
op,
})
}
},
PluginInput::Goodbye => {
drop(self.plugin_call_sender.take());
Ok(())
}
}
}
fn stream_manager(&self) -> &StreamManager {
&self.stream_manager
}
fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
match data {
PipelineData::Value(ref mut value, _) => {
PluginCustomValue::deserialize_custom_values_in(value)?;
Ok(data)
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream
.map(|mut value| {
let span = value.span();
PluginCustomValue::deserialize_custom_values_in(&mut value)
.map(|()| value)
.unwrap_or_else(|err| Value::error(err, span))
})
.into_pipeline_data_with_metadata(meta, ctrlc)),
PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data),
}
}
}
fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
call.positional
.iter_mut()
.try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
call.named
.iter_mut()
.flat_map(|(_, value)| value.as_mut())
.try_for_each(PluginCustomValue::deserialize_custom_values_in)
}
#[derive(Debug, Clone)]
pub struct EngineInterface {
state: Arc<EngineInterfaceState>,
stream_manager_handle: StreamManagerHandle,
context: Option<PluginCallId>,
}
impl EngineInterface {
pub(crate) fn hello(&self) -> Result<(), ShellError> {
self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
self.flush()
}
fn context(&self) -> Result<PluginCallId, ShellError> {
self.context.ok_or_else(|| ShellError::NushellFailed {
msg: "Tried to call an EngineInterface method that requires a call context \
outside of one"
.into(),
})
}
pub(crate) fn write_response(
&self,
result: Result<PipelineData, impl Into<LabeledError>>,
) -> Result<PipelineDataWriter<Self>, ShellError> {
match result {
Ok(data) => {
let (header, writer) = match self.init_write_pipeline_data(data) {
Ok(tup) => tup,
Err(err) => return self.write_response(Err(err)),
};
let response = PluginCallResponse::PipelineData(header);
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()?;
Ok(writer)
}
Err(err) => {
let response = PluginCallResponse::Error(err.into());
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()?;
Ok(Default::default())
}
}
}
pub(crate) fn write_signature(
&self,
signature: Vec<PluginSignature>,
) -> Result<(), ShellError> {
let response = PluginCallResponse::Signature(signature);
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()
}
}
impl Interface for EngineInterface {
type Output = PluginOutput;
fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
log::trace!("to engine: {:?}", output);
self.state.writer.write(&output)
}
fn flush(&self) -> Result<(), ShellError> {
self.state.writer.flush()
}
fn stream_id_sequence(&self) -> &Sequence {
&self.state.stream_id_sequence
}
fn stream_manager_handle(&self) -> &StreamManagerHandle {
&self.stream_manager_handle
}
fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
match data {
PipelineData::Value(ref mut value, _) => {
PluginCustomValue::serialize_custom_values_in(value)?;
Ok(data)
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream
.map(|mut value| {
let span = value.span();
PluginCustomValue::serialize_custom_values_in(&mut value)
.map(|_| value)
.unwrap_or_else(|err| Value::error(err, span))
})
.into_pipeline_data_with_metadata(meta, ctrlc)),
PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data),
}
}
}