use std::{
cmp::Ordering,
collections::HashMap,
env,
ffi::OsString,
ops::Deref,
panic::AssertUnwindSafe,
path::Path,
sync::mpsc::{self, TrySendError},
thread,
};
use nu_engine::documentation::{FormatterValue, HelpStyle, get_flags_section};
use nu_plugin_core::{
ClientCommunicationIo, CommunicationMode, InterfaceManager, PluginEncoder, PluginRead,
PluginWrite,
};
use nu_plugin_protocol::{
CallInfo, CustomValueOp, GetCompletionInfo, PluginCustomValue, PluginInput, PluginOutput,
};
use nu_protocol::{
CustomValue, IntoSpanned, LabeledError, PipelineData, PluginMetadata, ShellError, Span,
Spanned, Value, ast::Operator, casing::Casing,
};
use thiserror::Error;
use self::{command::render_examples, interface::ReceivedPluginCall};
mod command;
mod interface;
pub use command::{PluginCommand, SimplePluginCommand, create_plugin_signature};
pub use interface::{EngineInterface, EngineInterfaceManager};
#[allow(dead_code)]
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
pub trait Plugin: Sync {
fn version(&self) -> String;
fn commands(&self) -> Vec<Box<dyn PluginCommand<Plugin = Self>>>;
fn custom_value_to_base_value(
&self,
engine: &EngineInterface,
custom_value: Spanned<Box<dyn CustomValue>>,
) -> Result<Value, LabeledError> {
let _ = engine;
custom_value
.item
.to_base_value(custom_value.span)
.map_err(LabeledError::from)
}
fn custom_value_follow_path_int(
&self,
engine: &EngineInterface,
custom_value: Spanned<Box<dyn CustomValue>>,
index: Spanned<usize>,
optional: bool,
) -> Result<Value, LabeledError> {
let _ = engine;
custom_value
.item
.follow_path_int(custom_value.span, index.item, index.span, optional)
.map_err(LabeledError::from)
}
fn custom_value_follow_path_string(
&self,
engine: &EngineInterface,
custom_value: Spanned<Box<dyn CustomValue>>,
column_name: Spanned<String>,
optional: bool,
casing: Casing,
) -> Result<Value, LabeledError> {
let _ = engine;
custom_value
.item
.follow_path_string(
custom_value.span,
column_name.item,
column_name.span,
optional,
casing,
)
.map_err(LabeledError::from)
}
fn custom_value_partial_cmp(
&self,
engine: &EngineInterface,
custom_value: Box<dyn CustomValue>,
other_value: Value,
) -> Result<Option<Ordering>, LabeledError> {
let _ = engine;
Ok(custom_value.partial_cmp(&other_value))
}
fn custom_value_operation(
&self,
engine: &EngineInterface,
left: Spanned<Box<dyn CustomValue>>,
operator: Spanned<Operator>,
right: Value,
) -> Result<Value, LabeledError> {
let _ = engine;
left.item
.operation(left.span, operator.item, operator.span, &right)
.map_err(LabeledError::from)
}
fn custom_value_save(
&self,
engine: &EngineInterface,
value: Spanned<Box<dyn CustomValue>>,
path: Spanned<&Path>,
save_call_span: Span,
) -> Result<(), LabeledError> {
let _ = engine;
value
.item
.save(path, value.span, save_call_span)
.map_err(LabeledError::from)
}
fn custom_value_dropped(
&self,
engine: &EngineInterface,
custom_value: Box<dyn CustomValue>,
) -> Result<(), LabeledError> {
let _ = (engine, custom_value);
Ok(())
}
}
pub fn serve_plugin(plugin: &impl Plugin, encoder: impl PluginEncoder + 'static) {
let args: Vec<OsString> = env::args_os().skip(1).collect();
let exe = std::env::current_exe().ok();
let plugin_name: String = exe
.as_ref()
.and_then(|path| path.file_stem())
.map(|stem| stem.to_string_lossy().into_owned())
.map(|stem| {
stem.strip_prefix("nu_plugin_")
.map(|s| s.to_owned())
.unwrap_or(stem)
})
.unwrap_or_else(|| "(unknown)".into());
if args.is_empty() || args[0] == "-h" || args[0] == "--help" {
print_help(plugin, encoder);
std::process::exit(0)
}
let mode = if args[0] == "--stdio" && args.len() == 1 {
CommunicationMode::Stdio
} else if args[0] == "--local-socket" && args.len() == 2 {
#[cfg(feature = "local-socket")]
{
CommunicationMode::LocalSocket((&args[1]).into())
}
#[cfg(not(feature = "local-socket"))]
{
eprintln!("{plugin_name}: local socket mode is not supported");
std::process::exit(1);
}
} else {
eprintln!(
"{}: This plugin must be run from within Nushell. See `plugin add --help` for details \
on how to use plugins.",
env::current_exe()
.map(|path| path.display().to_string())
.unwrap_or_else(|_| "plugin".into())
);
eprintln!(
"If you are running from Nushell, this plugin may be incompatible with the \
version of nushell you are using."
);
std::process::exit(1)
};
let encoder_clone = encoder.clone();
let result = match mode.connect_as_client() {
Ok(ClientCommunicationIo::Stdio(stdin, mut stdout)) => {
tell_nushell_encoding(&mut stdout, &encoder).expect("failed to tell nushell encoding");
serve_plugin_io(
plugin,
&plugin_name,
move || (stdin.lock(), encoder_clone),
move || (stdout, encoder),
)
}
#[cfg(feature = "local-socket")]
Ok(ClientCommunicationIo::LocalSocket {
read_in,
mut write_out,
}) => {
use std::io::{BufReader, BufWriter};
use std::sync::Mutex;
tell_nushell_encoding(&mut write_out, &encoder)
.expect("failed to tell nushell encoding");
let read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, read_in);
let write = Mutex::new(BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, write_out));
serve_plugin_io(
plugin,
&plugin_name,
move || (read, encoder_clone),
move || (write, encoder),
)
}
Err(err) => {
eprintln!("{plugin_name}: failed to connect: {err:?}");
std::process::exit(1);
}
};
match result {
Ok(()) => (),
Err(ServePluginError::UnreportedError(err)) => {
eprintln!("Plugin `{plugin_name}` error: {err}");
std::process::exit(1);
}
Err(_) => std::process::exit(1),
}
}
fn tell_nushell_encoding(
writer: &mut impl std::io::Write,
encoder: &impl PluginEncoder,
) -> Result<(), std::io::Error> {
let encoding = encoder.name();
let length = encoding.len() as u8;
let mut encoding_content: Vec<u8> = encoding.as_bytes().to_vec();
encoding_content.insert(0, length);
writer.write_all(&encoding_content)?;
writer.flush()
}
#[derive(Debug, Error)]
pub enum ServePluginError {
#[error("{0}")]
UnreportedError(#[source] ShellError),
#[error("{0}")]
ReportedError(#[source] ShellError),
#[error("{0}")]
Incompatible(#[source] ShellError),
#[error("{0}")]
IOError(#[source] ShellError),
#[error("{0}")]
ThreadSpawnError(#[source] std::io::Error),
#[error("a panic occurred in a plugin thread")]
Panicked,
}
impl From<ShellError> for ServePluginError {
fn from(error: ShellError) -> Self {
match error {
ShellError::Io(_) => ServePluginError::IOError(error),
ShellError::PluginFailedToLoad { .. } => ServePluginError::Incompatible(error),
_ => ServePluginError::UnreportedError(error),
}
}
}
trait TryToReport {
type T;
fn try_to_report(self, engine: &EngineInterface) -> Result<Self::T, ServePluginError>;
}
impl<T, E> TryToReport for Result<T, E>
where
E: Into<ServePluginError>,
{
type T = T;
fn try_to_report(self, engine: &EngineInterface) -> Result<T, ServePluginError> {
self.map_err(|e| match e.into() {
ServePluginError::UnreportedError(err) => {
if engine.write_response(Err(err.clone())).is_ok() {
ServePluginError::ReportedError(err)
} else {
ServePluginError::UnreportedError(err)
}
}
other => other,
})
}
}
#[doc(hidden)]
pub fn serve_plugin_io<I, O>(
plugin: &impl Plugin,
plugin_name: &str,
input: impl FnOnce() -> I + Send + 'static,
output: impl FnOnce() -> O + Send + 'static,
) -> Result<(), ServePluginError>
where
I: PluginRead<PluginInput> + 'static,
O: PluginWrite<PluginOutput> + 'static,
{
let (error_tx, error_rx) = mpsc::channel();
let mut commands: HashMap<String, _> = HashMap::new();
for command in plugin.commands() {
if let Some(previous) = commands.insert(command.name().into(), command) {
eprintln!(
"Plugin `{plugin_name}` warning: command `{}` shadowed by another command with the \
same name. Check your commands' `name()` methods",
previous.name()
);
}
}
let mut manager = EngineInterfaceManager::new(output());
let call_receiver = manager
.take_plugin_call_receiver()
.expect("take_plugin_call_receiver returned None");
let interface = manager.get_interface();
interface.hello()?;
{
let error_tx = error_tx.clone();
std::thread::Builder::new()
.name("engine interface reader".into())
.spawn(move || {
if let Err(err) = manager.consume_all(input()) {
let _ = error_tx.send(ServePluginError::from(err));
}
})
.map_err(ServePluginError::ThreadSpawnError)?;
}
thread::scope(|scope| {
let run = |engine, call_info| {
let unwind_result = std::panic::catch_unwind(AssertUnwindSafe(|| {
let CallInfo { name, call, input } = call_info;
let result = if let Some(command) = commands.get(&name) {
command.run(plugin, &engine, &call, input)
} else {
Err(
LabeledError::new(format!("Plugin command not found: `{name}`"))
.with_label(
format!("plugin `{plugin_name}` doesn't have this command"),
call.head,
),
)
};
let write_result = engine
.write_response(result)
.and_then(|writer| writer.write())
.try_to_report(&engine);
if let Err(err) = write_result {
let _ = error_tx.send(err);
}
}));
if unwind_result.is_err() {
std::process::exit(1);
}
};
let get_dynamic_completion = |engine, get_dynamic_completion_info| {
let unwind_result = std::panic::catch_unwind(AssertUnwindSafe(|| {
let GetCompletionInfo {
name,
arg_type,
call,
} = get_dynamic_completion_info;
let items = if let Some(command) = commands.get(&name) {
let arg_type = arg_type.into();
command.get_dynamic_completion(
plugin,
&engine,
call,
arg_type,
#[expect(deprecated, reason = "internal usage")]
nu_protocol::engine::ExperimentalMarker,
)
} else {
None
};
let write_result = engine.write_completion_items(items).try_to_report(&engine);
if let Err(err) = write_result {
let _ = error_tx.send(err);
}
}));
if unwind_result.is_err() {
std::process::exit(1);
}
};
let (run_tx, run_rx) = mpsc::sync_channel(0);
thread::Builder::new()
.name("plugin runner (primary)".into())
.spawn_scoped(scope, move || {
for (engine, call) in run_rx {
run(engine, call);
}
})
.map_err(ServePluginError::ThreadSpawnError)?;
for plugin_call in call_receiver {
if let Ok(error) = error_rx.try_recv() {
return Err(error);
}
match plugin_call {
ReceivedPluginCall::Metadata { engine } => {
engine
.write_metadata(PluginMetadata::new().with_version(plugin.version()))
.try_to_report(&engine)?;
}
ReceivedPluginCall::Signature { engine } => {
let sigs = commands
.values()
.map(|command| create_plugin_signature(command.deref()))
.map(|mut sig| {
render_examples(plugin, &engine, &mut sig.examples)?;
Ok(sig)
})
.collect::<Result<Vec<_>, ShellError>>()
.try_to_report(&engine)?;
engine.write_signature(sigs).try_to_report(&engine)?;
}
ReceivedPluginCall::Run { engine, call } => {
match run_tx.try_send((engine, call)) {
Ok(()) => (),
Err(TrySendError::Full((engine, call)))
| Err(TrySendError::Disconnected((engine, call))) => {
thread::Builder::new()
.name("plugin runner (secondary)".into())
.spawn_scoped(scope, move || run(engine, call))
.map_err(ServePluginError::ThreadSpawnError)?;
}
}
}
ReceivedPluginCall::CustomValueOp {
engine,
custom_value,
op,
} => {
custom_value_op(plugin, &engine, custom_value, op).try_to_report(&engine)?;
}
ReceivedPluginCall::GetCompletion { engine, info } => {
get_dynamic_completion(engine, info)
}
}
}
Ok::<_, ServePluginError>(())
})?;
drop(interface);
if let Ok(err) = error_rx.try_recv() {
Err(err)
} else {
Ok(())
}
}
fn custom_value_op(
plugin: &impl Plugin,
engine: &EngineInterface,
custom_value: Spanned<PluginCustomValue>,
op: CustomValueOp,
) -> Result<(), ShellError> {
let local_value = custom_value
.item
.deserialize_to_custom_value(custom_value.span)?
.into_spanned(custom_value.span);
match op {
CustomValueOp::ToBaseValue => {
let result = plugin
.custom_value_to_base_value(engine, local_value)
.map(|value| PipelineData::value(value, None));
engine
.write_response(result)
.and_then(|writer| writer.write())
}
CustomValueOp::FollowPathInt { index, optional } => {
let result = plugin
.custom_value_follow_path_int(engine, local_value, index, optional)
.map(|value| PipelineData::value(value, None));
engine
.write_response(result)
.and_then(|writer| writer.write())
}
CustomValueOp::FollowPathString {
column_name,
optional,
casing,
} => {
let result = plugin
.custom_value_follow_path_string(engine, local_value, column_name, optional, casing)
.map(|value| PipelineData::value(value, None));
engine
.write_response(result)
.and_then(|writer| writer.write())
}
CustomValueOp::PartialCmp(mut other_value) => {
PluginCustomValue::deserialize_custom_values_in(&mut other_value)?;
match plugin.custom_value_partial_cmp(engine, local_value.item, other_value) {
Ok(ordering) => engine.write_ordering(ordering),
Err(err) => engine
.write_response(Err(err))
.and_then(|writer| writer.write()),
}
}
CustomValueOp::Operation(operator, mut right) => {
PluginCustomValue::deserialize_custom_values_in(&mut right)?;
let result = plugin
.custom_value_operation(engine, local_value, operator, right)
.map(|value| PipelineData::value(value, None));
engine
.write_response(result)
.and_then(|writer| writer.write())
}
CustomValueOp::Save {
path,
save_call_span,
} => {
let path = Spanned {
item: path.item.as_path(),
span: path.span,
};
let result = plugin.custom_value_save(engine, local_value, path, save_call_span);
engine.write_ok(result)
}
CustomValueOp::Dropped => {
let result = plugin
.custom_value_dropped(engine, local_value.item)
.map(|_| PipelineData::empty());
engine
.write_response(result)
.and_then(|writer| writer.write())
}
}
}
fn print_help(plugin: &impl Plugin, encoder: impl PluginEncoder) {
use std::fmt::Write;
println!("Nushell Plugin");
println!("Encoder: {}", encoder.name());
println!("Version: {}", plugin.version());
let exe = std::env::current_exe().ok();
let plugin_name: String = exe
.as_ref()
.map(|stem| stem.to_string_lossy().into_owned())
.unwrap_or_else(|| "(unknown)".into());
println!("Plugin file path: {plugin_name}");
let mut help = String::new();
let help_style = HelpStyle::default();
plugin.commands().into_iter().for_each(|command| {
let signature = command.signature();
let res = write!(help, "\nCommand: {}", command.name())
.and_then(|_| writeln!(help, "\nDescription:\n > {}", command.description()))
.and_then(|_| {
if !command.extra_description().is_empty() {
writeln!(
help,
"\nExtra description:\n > {}",
command.extra_description()
)
} else {
Ok(())
}
})
.and_then(|_| {
let flags = get_flags_section(&signature, &help_style, |v| match v {
FormatterValue::DefaultValue(value) => format!("{value:#?}"),
FormatterValue::CodeString(text) => text.to_string(),
});
write!(help, "{flags}")
})
.and_then(|_| writeln!(help, "\nParameters:"))
.and_then(|_| {
signature
.required_positional
.iter()
.try_for_each(|positional| {
writeln!(
help,
" {} <{}>: {}",
positional.name, positional.shape, positional.desc
)
})
})
.and_then(|_| {
signature
.optional_positional
.iter()
.try_for_each(|positional| {
writeln!(
help,
" (optional) {} <{}>: {}",
positional.name, positional.shape, positional.desc
)
})
})
.and_then(|_| {
if let Some(rest_positional) = &signature.rest_positional {
writeln!(
help,
" ...{} <{}>: {}",
rest_positional.name, rest_positional.shape, rest_positional.desc
)
} else {
Ok(())
}
})
.and_then(|_| writeln!(help, "======================"));
if res.is_err() {
println!("{res:?}")
}
});
println!("{help}")
}