1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 env,
5 ffi::OsString,
6 ops::Deref,
7 panic::AssertUnwindSafe,
8 sync::mpsc::{self, TrySendError},
9 thread,
10};
11
12use nu_engine::documentation::{get_flags_section, HelpStyle};
13use nu_plugin_core::{
14 ClientCommunicationIo, CommunicationMode, InterfaceManager, PluginEncoder, PluginRead,
15 PluginWrite,
16};
17use nu_plugin_protocol::{CallInfo, CustomValueOp, PluginCustomValue, PluginInput, PluginOutput};
18use nu_protocol::{
19 ast::Operator, CustomValue, IntoSpanned, LabeledError, PipelineData, PluginMetadata,
20 ShellError, Spanned, Value,
21};
22use thiserror::Error;
23
24use self::{command::render_examples, interface::ReceivedPluginCall};
25
26mod command;
27mod interface;
28
29pub use command::{create_plugin_signature, PluginCommand, SimplePluginCommand};
30pub use interface::{EngineInterface, EngineInterfaceManager};
31
32#[allow(dead_code)]
36pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
37
38pub trait Plugin: Sync {
97 fn version(&self) -> String;
113
114 fn commands(&self) -> Vec<Box<dyn PluginCommand<Plugin = Self>>>;
122
123 fn custom_value_to_base_value(
128 &self,
129 engine: &EngineInterface,
130 custom_value: Spanned<Box<dyn CustomValue>>,
131 ) -> Result<Value, LabeledError> {
132 let _ = engine;
133 custom_value
134 .item
135 .to_base_value(custom_value.span)
136 .map_err(LabeledError::from)
137 }
138
139 fn custom_value_follow_path_int(
144 &self,
145 engine: &EngineInterface,
146 custom_value: Spanned<Box<dyn CustomValue>>,
147 index: Spanned<usize>,
148 ) -> Result<Value, LabeledError> {
149 let _ = engine;
150 custom_value
151 .item
152 .follow_path_int(custom_value.span, index.item, index.span)
153 .map_err(LabeledError::from)
154 }
155
156 fn custom_value_follow_path_string(
161 &self,
162 engine: &EngineInterface,
163 custom_value: Spanned<Box<dyn CustomValue>>,
164 column_name: Spanned<String>,
165 ) -> Result<Value, LabeledError> {
166 let _ = engine;
167 custom_value
168 .item
169 .follow_path_string(custom_value.span, column_name.item, column_name.span)
170 .map_err(LabeledError::from)
171 }
172
173 fn custom_value_partial_cmp(
182 &self,
183 engine: &EngineInterface,
184 custom_value: Box<dyn CustomValue>,
185 other_value: Value,
186 ) -> Result<Option<Ordering>, LabeledError> {
187 let _ = engine;
188 Ok(custom_value.partial_cmp(&other_value))
189 }
190
191 fn custom_value_operation(
196 &self,
197 engine: &EngineInterface,
198 left: Spanned<Box<dyn CustomValue>>,
199 operator: Spanned<Operator>,
200 right: Value,
201 ) -> Result<Value, LabeledError> {
202 let _ = engine;
203 left.item
204 .operation(left.span, operator.item, operator.span, &right)
205 .map_err(LabeledError::from)
206 }
207
208 fn custom_value_dropped(
221 &self,
222 engine: &EngineInterface,
223 custom_value: Box<dyn CustomValue>,
224 ) -> Result<(), LabeledError> {
225 let _ = (engine, custom_value);
226 Ok(())
227 }
228}
229
230pub fn serve_plugin(plugin: &impl Plugin, encoder: impl PluginEncoder + 'static) {
249 let args: Vec<OsString> = env::args_os().skip(1).collect();
250
251 let exe = std::env::current_exe().ok();
253
254 let plugin_name: String = exe
255 .as_ref()
256 .and_then(|path| path.file_stem())
257 .map(|stem| stem.to_string_lossy().into_owned())
258 .map(|stem| {
259 stem.strip_prefix("nu_plugin_")
260 .map(|s| s.to_owned())
261 .unwrap_or(stem)
262 })
263 .unwrap_or_else(|| "(unknown)".into());
264
265 if args.is_empty() || args[0] == "-h" || args[0] == "--help" {
266 print_help(plugin, encoder);
267 std::process::exit(0)
268 }
269
270 let mode = if args[0] == "--stdio" && args.len() == 1 {
272 CommunicationMode::Stdio
274 } else if args[0] == "--local-socket" && args.len() == 2 {
275 #[cfg(feature = "local-socket")]
276 {
277 CommunicationMode::LocalSocket((&args[1]).into())
278 }
279 #[cfg(not(feature = "local-socket"))]
280 {
281 eprintln!("{plugin_name}: local socket mode is not supported");
282 std::process::exit(1);
283 }
284 } else {
285 eprintln!(
286 "{}: This plugin must be run from within Nushell. See `plugin add --help` for details \
287 on how to use plugins.",
288 env::current_exe()
289 .map(|path| path.display().to_string())
290 .unwrap_or_else(|_| "plugin".into())
291 );
292 eprintln!(
293 "If you are running from Nushell, this plugin may be incompatible with the \
294 version of nushell you are using."
295 );
296 std::process::exit(1)
297 };
298
299 let encoder_clone = encoder.clone();
300
301 let result = match mode.connect_as_client() {
302 Ok(ClientCommunicationIo::Stdio(stdin, mut stdout)) => {
303 tell_nushell_encoding(&mut stdout, &encoder).expect("failed to tell nushell encoding");
304 serve_plugin_io(
305 plugin,
306 &plugin_name,
307 move || (stdin.lock(), encoder_clone),
308 move || (stdout, encoder),
309 )
310 }
311 #[cfg(feature = "local-socket")]
312 Ok(ClientCommunicationIo::LocalSocket {
313 read_in,
314 mut write_out,
315 }) => {
316 use std::io::{BufReader, BufWriter};
317 use std::sync::Mutex;
318
319 tell_nushell_encoding(&mut write_out, &encoder)
320 .expect("failed to tell nushell encoding");
321
322 let read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, read_in);
323 let write = Mutex::new(BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, write_out));
324 serve_plugin_io(
325 plugin,
326 &plugin_name,
327 move || (read, encoder_clone),
328 move || (write, encoder),
329 )
330 }
331 Err(err) => {
332 eprintln!("{plugin_name}: failed to connect: {err:?}");
333 std::process::exit(1);
334 }
335 };
336
337 match result {
338 Ok(()) => (),
339 Err(ServePluginError::UnreportedError(err)) => {
341 eprintln!("Plugin `{plugin_name}` error: {err}");
342 std::process::exit(1);
343 }
344 Err(_) => std::process::exit(1),
345 }
346}
347
348fn tell_nushell_encoding(
349 writer: &mut impl std::io::Write,
350 encoder: &impl PluginEncoder,
351) -> Result<(), std::io::Error> {
352 let encoding = encoder.name();
357 let length = encoding.len() as u8;
358 let mut encoding_content: Vec<u8> = encoding.as_bytes().to_vec();
359 encoding_content.insert(0, length);
360 writer.write_all(&encoding_content)?;
361 writer.flush()
362}
363
364#[derive(Debug, Error)]
366pub enum ServePluginError {
367 #[error("{0}")]
369 UnreportedError(#[source] ShellError),
370 #[error("{0}")]
372 ReportedError(#[source] ShellError),
373 #[error("{0}")]
375 Incompatible(#[source] ShellError),
376 #[error("{0}")]
378 IOError(#[source] ShellError),
379 #[error("{0}")]
381 ThreadSpawnError(#[source] std::io::Error),
382 #[error("a panic occurred in a plugin thread")]
384 Panicked,
385}
386
387impl From<ShellError> for ServePluginError {
388 fn from(error: ShellError) -> Self {
389 match error {
390 ShellError::Io(_) => ServePluginError::IOError(error),
391 ShellError::PluginFailedToLoad { .. } => ServePluginError::Incompatible(error),
392 _ => ServePluginError::UnreportedError(error),
393 }
394 }
395}
396
397trait TryToReport {
399 type T;
400 fn try_to_report(self, engine: &EngineInterface) -> Result<Self::T, ServePluginError>;
401}
402
403impl<T, E> TryToReport for Result<T, E>
404where
405 E: Into<ServePluginError>,
406{
407 type T = T;
408 fn try_to_report(self, engine: &EngineInterface) -> Result<T, ServePluginError> {
409 self.map_err(|e| match e.into() {
410 ServePluginError::UnreportedError(err) => {
411 if engine.write_response(Err(err.clone())).is_ok() {
412 ServePluginError::ReportedError(err)
413 } else {
414 ServePluginError::UnreportedError(err)
415 }
416 }
417 other => other,
418 })
419 }
420}
421
422#[doc(hidden)]
429pub fn serve_plugin_io<I, O>(
430 plugin: &impl Plugin,
431 plugin_name: &str,
432 input: impl FnOnce() -> I + Send + 'static,
433 output: impl FnOnce() -> O + Send + 'static,
434) -> Result<(), ServePluginError>
435where
436 I: PluginRead<PluginInput> + 'static,
437 O: PluginWrite<PluginOutput> + 'static,
438{
439 let (error_tx, error_rx) = mpsc::channel();
440
441 let mut commands: HashMap<String, _> = HashMap::new();
443
444 for command in plugin.commands() {
445 if let Some(previous) = commands.insert(command.name().into(), command) {
446 eprintln!(
447 "Plugin `{plugin_name}` warning: command `{}` shadowed by another command with the \
448 same name. Check your commands' `name()` methods",
449 previous.name()
450 );
451 }
452 }
453
454 let mut manager = EngineInterfaceManager::new(output());
455 let call_receiver = manager
456 .take_plugin_call_receiver()
457 .expect("take_plugin_call_receiver returned None");
459
460 let interface = manager.get_interface();
462
463 interface.hello()?;
465
466 {
467 let error_tx = error_tx.clone();
469 std::thread::Builder::new()
470 .name("engine interface reader".into())
471 .spawn(move || {
472 if let Err(err) = manager.consume_all(input()) {
474 let _ = error_tx.send(ServePluginError::from(err));
475 }
476 })
477 .map_err(ServePluginError::ThreadSpawnError)?;
478 }
479
480 thread::scope(|scope| {
482 let run = |engine, call_info| {
483 let unwind_result = std::panic::catch_unwind(AssertUnwindSafe(|| {
486 let CallInfo { name, call, input } = call_info;
487 let result = if let Some(command) = commands.get(&name) {
488 command.run(plugin, &engine, &call, input)
489 } else {
490 Err(
491 LabeledError::new(format!("Plugin command not found: `{name}`"))
492 .with_label(
493 format!("plugin `{plugin_name}` doesn't have this command"),
494 call.head,
495 ),
496 )
497 };
498 let write_result = engine
499 .write_response(result)
500 .and_then(|writer| writer.write())
501 .try_to_report(&engine);
502 if let Err(err) = write_result {
503 let _ = error_tx.send(err);
504 }
505 }));
506 if unwind_result.is_err() {
507 std::process::exit(1);
509 }
510 };
511
512 let (run_tx, run_rx) = mpsc::sync_channel(0);
514 thread::Builder::new()
515 .name("plugin runner (primary)".into())
516 .spawn_scoped(scope, move || {
517 for (engine, call) in run_rx {
518 run(engine, call);
519 }
520 })
521 .map_err(ServePluginError::ThreadSpawnError)?;
522
523 for plugin_call in call_receiver {
524 if let Ok(error) = error_rx.try_recv() {
526 return Err(error);
527 }
528
529 match plugin_call {
530 ReceivedPluginCall::Metadata { engine } => {
532 engine
533 .write_metadata(PluginMetadata::new().with_version(plugin.version()))
534 .try_to_report(&engine)?;
535 }
536 ReceivedPluginCall::Signature { engine } => {
538 let sigs = commands
539 .values()
540 .map(|command| create_plugin_signature(command.deref()))
541 .map(|mut sig| {
542 render_examples(plugin, &engine, &mut sig.examples)?;
543 Ok(sig)
544 })
545 .collect::<Result<Vec<_>, ShellError>>()
546 .try_to_report(&engine)?;
547 engine.write_signature(sigs).try_to_report(&engine)?;
548 }
549 ReceivedPluginCall::Run { engine, call } => {
551 match run_tx.try_send((engine, call)) {
553 Ok(()) => (),
554 Err(TrySendError::Full((engine, call)))
556 | Err(TrySendError::Disconnected((engine, call))) => {
557 thread::Builder::new()
558 .name("plugin runner (secondary)".into())
559 .spawn_scoped(scope, move || run(engine, call))
560 .map_err(ServePluginError::ThreadSpawnError)?;
561 }
562 }
563 }
564 ReceivedPluginCall::CustomValueOp {
566 engine,
567 custom_value,
568 op,
569 } => {
570 custom_value_op(plugin, &engine, custom_value, op).try_to_report(&engine)?;
571 }
572 }
573 }
574
575 Ok::<_, ServePluginError>(())
576 })?;
577
578 drop(interface);
580
581 if let Ok(err) = error_rx.try_recv() {
583 Err(err)
584 } else {
585 Ok(())
586 }
587}
588
589fn custom_value_op(
590 plugin: &impl Plugin,
591 engine: &EngineInterface,
592 custom_value: Spanned<PluginCustomValue>,
593 op: CustomValueOp,
594) -> Result<(), ShellError> {
595 let local_value = custom_value
596 .item
597 .deserialize_to_custom_value(custom_value.span)?
598 .into_spanned(custom_value.span);
599 match op {
600 CustomValueOp::ToBaseValue => {
601 let result = plugin
602 .custom_value_to_base_value(engine, local_value)
603 .map(|value| PipelineData::Value(value, None));
604 engine
605 .write_response(result)
606 .and_then(|writer| writer.write())
607 }
608 CustomValueOp::FollowPathInt(index) => {
609 let result = plugin
610 .custom_value_follow_path_int(engine, local_value, index)
611 .map(|value| PipelineData::Value(value, None));
612 engine
613 .write_response(result)
614 .and_then(|writer| writer.write())
615 }
616 CustomValueOp::FollowPathString(column_name) => {
617 let result = plugin
618 .custom_value_follow_path_string(engine, local_value, column_name)
619 .map(|value| PipelineData::Value(value, None));
620 engine
621 .write_response(result)
622 .and_then(|writer| writer.write())
623 }
624 CustomValueOp::PartialCmp(mut other_value) => {
625 PluginCustomValue::deserialize_custom_values_in(&mut other_value)?;
626 match plugin.custom_value_partial_cmp(engine, local_value.item, other_value) {
627 Ok(ordering) => engine.write_ordering(ordering),
628 Err(err) => engine
629 .write_response(Err(err))
630 .and_then(|writer| writer.write()),
631 }
632 }
633 CustomValueOp::Operation(operator, mut right) => {
634 PluginCustomValue::deserialize_custom_values_in(&mut right)?;
635 let result = plugin
636 .custom_value_operation(engine, local_value, operator, right)
637 .map(|value| PipelineData::Value(value, None));
638 engine
639 .write_response(result)
640 .and_then(|writer| writer.write())
641 }
642 CustomValueOp::Dropped => {
643 let result = plugin
644 .custom_value_dropped(engine, local_value.item)
645 .map(|_| PipelineData::Empty);
646 engine
647 .write_response(result)
648 .and_then(|writer| writer.write())
649 }
650 }
651}
652
653fn print_help(plugin: &impl Plugin, encoder: impl PluginEncoder) {
654 use std::fmt::Write;
655
656 println!("Nushell Plugin");
657 println!("Encoder: {}", encoder.name());
658 println!("Version: {}", plugin.version());
659
660 let exe = std::env::current_exe().ok();
662 let plugin_name: String = exe
663 .as_ref()
664 .map(|stem| stem.to_string_lossy().into_owned())
665 .unwrap_or_else(|| "(unknown)".into());
666 println!("Plugin file path: {}", plugin_name);
667
668 let mut help = String::new();
669 let help_style = HelpStyle::default();
670
671 plugin.commands().into_iter().for_each(|command| {
672 let signature = command.signature();
673 let res = write!(help, "\nCommand: {}", command.name())
674 .and_then(|_| writeln!(help, "\nDescription:\n > {}", command.description()))
675 .and_then(|_| {
676 if !command.extra_description().is_empty() {
677 writeln!(
678 help,
679 "\nExtra description:\n > {}",
680 command.extra_description()
681 )
682 } else {
683 Ok(())
684 }
685 })
686 .and_then(|_| {
687 let flags = get_flags_section(&signature, &help_style, |v| format!("{:#?}", v));
688 write!(help, "{flags}")
689 })
690 .and_then(|_| writeln!(help, "\nParameters:"))
691 .and_then(|_| {
692 signature
693 .required_positional
694 .iter()
695 .try_for_each(|positional| {
696 writeln!(
697 help,
698 " {} <{}>: {}",
699 positional.name, positional.shape, positional.desc
700 )
701 })
702 })
703 .and_then(|_| {
704 signature
705 .optional_positional
706 .iter()
707 .try_for_each(|positional| {
708 writeln!(
709 help,
710 " (optional) {} <{}>: {}",
711 positional.name, positional.shape, positional.desc
712 )
713 })
714 })
715 .and_then(|_| {
716 if let Some(rest_positional) = &signature.rest_positional {
717 writeln!(
718 help,
719 " ...{} <{}>: {}",
720 rest_positional.name, rest_positional.shape, rest_positional.desc
721 )
722 } else {
723 Ok(())
724 }
725 })
726 .and_then(|_| writeln!(help, "======================"));
727
728 if res.is_err() {
729 println!("{res:?}")
730 }
731 });
732
733 println!("{help}")
734}