mod declaration;
pub use declaration::PluginDeclaration;
use nu_engine::documentation::get_flags_section;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::sync::{Arc, Mutex};
use crate::plugin::interface::{EngineInterfaceManager, ReceivedPluginCall};
use crate::protocol::{CallInfo, CustomValueOp, LabeledError, PluginInput, PluginOutput};
use crate::EncodingType;
use std::env;
use std::fmt::Write;
use std::io::{BufReader, Read, Write as WriteTrait};
use std::path::Path;
use std::process::{Child, ChildStdout, Command as CommandSys, Stdio};
use nu_protocol::{PipelineData, PluginSignature, ShellError, Value};
mod interface;
pub(crate) use interface::PluginInterface;
mod context;
pub(crate) use context::PluginExecutionCommandContext;
mod identity;
pub(crate) use identity::PluginIdentity;
use self::interface::{InterfaceManager, PluginInterfaceManager};
use super::EvaluatedCall;
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192;
#[doc(hidden)]
pub trait Encoder<T>: Clone + Send + Sync {
#[doc(hidden)]
fn encode(&self, data: &T, writer: &mut impl std::io::Write) -> Result<(), ShellError>;
#[doc(hidden)]
fn decode(&self, reader: &mut impl std::io::BufRead) -> Result<Option<T>, ShellError>;
}
pub trait PluginEncoder: Encoder<PluginInput> + Encoder<PluginOutput> {
fn name(&self) -> &str;
}
fn create_command(path: &Path, shell: Option<&Path>) -> CommandSys {
log::trace!("Starting plugin: {path:?}, shell = {shell:?}");
let mut input_arg = Some("--stdio");
let mut process = match (path.extension(), shell) {
(_, Some(shell)) => {
let mut process = std::process::Command::new(shell);
process.arg(path);
process
}
(Some(extension), None) => {
let (shell, command_switch) = match extension.to_str() {
Some("cmd") | Some("bat") => (Some("cmd"), Some("/c")),
Some("sh") => (Some("sh"), Some("-c")),
Some("py") => (Some("python"), None),
_ => (None, None),
};
match (shell, command_switch) {
(Some(shell), Some(command_switch)) => {
let mut process = std::process::Command::new(shell);
process.arg(command_switch);
let mut combined = path.as_os_str().to_owned();
if let Some(arg) = input_arg.take() {
combined.push(OsStr::new(" "));
combined.push(OsStr::new(arg));
}
process.arg(combined);
process
}
(Some(shell), None) => {
let mut process = std::process::Command::new(shell);
process.arg(path);
process
}
_ => std::process::Command::new(path),
}
}
(None, None) => std::process::Command::new(path),
};
if let Some(input_arg) = input_arg {
process.arg(input_arg);
}
process.stdout(Stdio::piped()).stdin(Stdio::piped());
process
}
fn make_plugin_interface(
mut child: Child,
identity: Arc<PluginIdentity>,
) -> Result<PluginInterface, ShellError> {
let stdin = child
.stdin
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdin writer".into(),
})?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdout writer".into(),
})?;
let encoder = get_plugin_encoding(&mut stdout)?;
let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout);
let mut manager = PluginInterfaceManager::new(identity, (Mutex::new(stdin), encoder));
let interface = manager.get_interface();
interface.hello()?;
std::thread::Builder::new()
.name("plugin interface reader".into())
.spawn(move || {
if let Err(err) = manager.consume_all((reader, encoder)) {
log::warn!("Error in PluginInterfaceManager: {err}");
}
drop(manager);
let _ = child.wait();
})
.map_err(|err| ShellError::PluginFailedToLoad {
msg: format!("Failed to spawn thread for plugin: {err}"),
})?;
Ok(interface)
}
#[doc(hidden)] pub fn get_signature(
path: &Path,
shell: Option<&Path>,
current_envs: &HashMap<String, String>,
) -> Result<Vec<PluginSignature>, ShellError> {
Arc::new(PluginIdentity::new(path, shell.map(|s| s.to_owned())))
.spawn(current_envs)?
.get_signature()
}
pub trait Plugin {
fn signature(&self) -> Vec<PluginSignature>;
fn run(
&mut self,
name: &str,
config: &Option<Value>,
call: &EvaluatedCall,
input: &Value,
) -> Result<Value, LabeledError>;
}
pub trait StreamingPlugin {
fn signature(&self) -> Vec<PluginSignature>;
fn run(
&mut self,
name: &str,
config: &Option<Value>,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError>;
}
impl<T: Plugin> StreamingPlugin for T {
fn signature(&self) -> Vec<PluginSignature> {
<Self as Plugin>::signature(self)
}
fn run(
&mut self,
name: &str,
config: &Option<Value>,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let span = input.span().unwrap_or(call.head);
let input_value = input.into_value(span);
<Self as Plugin>::run(self, name, config, call, &input_value)
.map(|value| PipelineData::Value(value, None))
}
}
pub fn serve_plugin(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncoder + 'static) {
let mut args = env::args().skip(1);
let number_of_args = args.len();
let first_arg = args.next();
if number_of_args == 0
|| first_arg
.as_ref()
.is_some_and(|arg| arg == "-h" || arg == "--help")
{
print_help(plugin, encoder);
std::process::exit(0)
}
if number_of_args > 1 || !first_arg.is_some_and(|arg| arg == "--stdio") {
eprintln!(
"{}: This plugin must be run from within Nushell.",
env::current_exe()
.map(|path| path.display().to_string())
.unwrap_or_else(|_| "plugin".into())
);
eprintln!(
"If you are running from Nushell, this plugin may be incompatible with the \
version of nushell you are using."
);
std::process::exit(1)
}
let mut stdout = std::io::stdout();
{
let encoding = encoder.name();
let length = encoding.len() as u8;
let mut encoding_content: Vec<u8> = encoding.as_bytes().to_vec();
encoding_content.insert(0, length);
stdout
.write_all(&encoding_content)
.expect("Failed to tell nushell my encoding");
stdout
.flush()
.expect("Failed to tell nushell my encoding when flushing stdout");
}
let mut manager = EngineInterfaceManager::new((stdout, encoder.clone()));
let call_receiver = manager
.take_plugin_call_receiver()
.expect("take_plugin_call_receiver returned None");
let interface = manager.get_interface();
let exe = std::env::current_exe().ok();
let plugin_name: String = exe
.as_ref()
.and_then(|path| path.file_stem())
.map(|stem| stem.to_string_lossy().into_owned())
.map(|stem| {
stem.strip_prefix("nu_plugin_")
.map(|s| s.to_owned())
.unwrap_or(stem)
})
.unwrap_or_else(|| "(unknown)".into());
macro_rules! try_or_report {
($interface:expr, $expr:expr) => (match $expr {
Ok(val) => val,
Err(ShellError::IOError { .. }) => std::process::exit(1),
Err(err) => {
let _ = $interface.write_response(Err(err.clone())).unwrap_or_else(|_| {
panic!("Plugin `{plugin_name}`: {}", err)
});
std::process::exit(1)
}
})
}
try_or_report!(interface, interface.hello());
let plugin_name_clone = plugin_name.clone();
std::thread::Builder::new()
.name("engine interface reader".into())
.spawn(move || {
if let Err(err) = manager.consume_all((std::io::stdin().lock(), encoder)) {
eprintln!("Plugin `{plugin_name_clone}` read error: {err}");
std::process::exit(1);
}
})
.unwrap_or_else(|err| {
eprintln!("Plugin `{plugin_name}` failed to launch: {err}");
std::process::exit(1);
});
for plugin_call in call_receiver {
match plugin_call {
ReceivedPluginCall::Signature { engine } => {
try_or_report!(engine, engine.write_signature(plugin.signature()));
}
ReceivedPluginCall::Run {
engine,
call:
CallInfo {
name,
config,
call,
input,
},
} => {
let result = plugin.run(&name, &config, &call, input);
let write_result = engine
.write_response(result)
.and_then(|writer| writer.write_background());
try_or_report!(engine, write_result);
}
ReceivedPluginCall::CustomValueOp {
engine,
custom_value,
op,
} => {
let local_value = try_or_report!(
engine,
custom_value
.item
.deserialize_to_custom_value(custom_value.span)
);
match op {
CustomValueOp::ToBaseValue => {
let result = local_value
.to_base_value(custom_value.span)
.map(|value| PipelineData::Value(value, None));
let write_result = engine
.write_response(result)
.and_then(|writer| writer.write_background());
try_or_report!(engine, write_result);
}
}
}
}
}
drop(interface);
}
fn print_help(plugin: &mut impl StreamingPlugin, encoder: impl PluginEncoder) {
println!("Nushell Plugin");
println!("Encoder: {}", encoder.name());
let mut help = String::new();
plugin.signature().iter().for_each(|signature| {
let res = write!(help, "\nCommand: {}", signature.sig.name)
.and_then(|_| writeln!(help, "\nUsage:\n > {}", signature.sig.usage))
.and_then(|_| {
if !signature.sig.extra_usage.is_empty() {
writeln!(help, "\nExtra usage:\n > {}", signature.sig.extra_usage)
} else {
Ok(())
}
})
.and_then(|_| {
let flags = get_flags_section(None, &signature.sig, |v| format!("{:#?}", v));
write!(help, "{flags}")
})
.and_then(|_| writeln!(help, "\nParameters:"))
.and_then(|_| {
signature
.sig
.required_positional
.iter()
.try_for_each(|positional| {
writeln!(
help,
" {} <{}>: {}",
positional.name, positional.shape, positional.desc
)
})
})
.and_then(|_| {
signature
.sig
.optional_positional
.iter()
.try_for_each(|positional| {
writeln!(
help,
" (optional) {} <{}>: {}",
positional.name, positional.shape, positional.desc
)
})
})
.and_then(|_| {
if let Some(rest_positional) = &signature.sig.rest_positional {
writeln!(
help,
" ...{} <{}>: {}",
rest_positional.name, rest_positional.shape, rest_positional.desc
)
} else {
Ok(())
}
})
.and_then(|_| writeln!(help, "======================"));
if res.is_err() {
println!("{res:?}")
}
});
println!("{help}")
}
pub fn get_plugin_encoding(child_stdout: &mut ChildStdout) -> Result<EncodingType, ShellError> {
let mut length_buf = [0u8; 1];
child_stdout
.read_exact(&mut length_buf)
.map_err(|e| ShellError::PluginFailedToLoad {
msg: format!("unable to get encoding from plugin: {e}"),
})?;
let mut buf = vec![0u8; length_buf[0] as usize];
child_stdout
.read_exact(&mut buf)
.map_err(|e| ShellError::PluginFailedToLoad {
msg: format!("unable to get encoding from plugin: {e}"),
})?;
EncodingType::try_from_bytes(&buf).ok_or_else(|| {
let encoding_for_debug = String::from_utf8_lossy(&buf);
ShellError::PluginFailedToLoad {
msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
}
})
}