1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 env,
5 ffi::OsString,
6 ops::Deref,
7 panic::AssertUnwindSafe,
8 path::Path,
9 sync::mpsc::{self, TrySendError},
10 thread,
11};
12
13use nu_engine::documentation::{FormatterValue, HelpStyle, get_flags_section};
14use nu_plugin_core::{
15 ClientCommunicationIo, CommunicationMode, InterfaceManager, PluginEncoder, PluginRead,
16 PluginWrite,
17};
18use nu_plugin_protocol::{
19 CallInfo, CustomValueOp, GetCompletionInfo, PluginCustomValue, PluginInput, PluginOutput,
20};
21use nu_protocol::{
22 CustomValue, IntoSpanned, LabeledError, PipelineData, PluginMetadata, ShellError, Span,
23 Spanned, Value, ast::Operator, casing::Casing,
24};
25use thiserror::Error;
26
27use self::{command::render_examples, interface::ReceivedPluginCall};
28
29mod command;
30mod interface;
31
32pub use command::{PluginCommand, SimplePluginCommand, create_plugin_signature};
33pub use interface::{EngineInterface, EngineInterfaceManager};
34
35#[allow(dead_code)]
39pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
40
41pub trait Plugin: Sync {
100 fn version(&self) -> String;
116
117 fn commands(&self) -> Vec<Box<dyn PluginCommand<Plugin = Self>>>;
125
126 fn custom_value_to_base_value(
131 &self,
132 engine: &EngineInterface,
133 custom_value: Spanned<Box<dyn CustomValue>>,
134 ) -> Result<Value, LabeledError> {
135 let _ = engine;
136 custom_value
137 .item
138 .to_base_value(custom_value.span)
139 .map_err(LabeledError::from)
140 }
141
142 fn custom_value_follow_path_int(
147 &self,
148 engine: &EngineInterface,
149 custom_value: Spanned<Box<dyn CustomValue>>,
150 index: Spanned<usize>,
151 optional: bool,
152 ) -> Result<Value, LabeledError> {
153 let _ = engine;
154 custom_value
155 .item
156 .follow_path_int(custom_value.span, index.item, index.span, optional)
157 .map_err(LabeledError::from)
158 }
159
160 fn custom_value_follow_path_string(
165 &self,
166 engine: &EngineInterface,
167 custom_value: Spanned<Box<dyn CustomValue>>,
168 column_name: Spanned<String>,
169 optional: bool,
170 casing: Casing,
171 ) -> Result<Value, LabeledError> {
172 let _ = engine;
173 custom_value
174 .item
175 .follow_path_string(
176 custom_value.span,
177 column_name.item,
178 column_name.span,
179 optional,
180 casing,
181 )
182 .map_err(LabeledError::from)
183 }
184
185 fn custom_value_partial_cmp(
194 &self,
195 engine: &EngineInterface,
196 custom_value: Box<dyn CustomValue>,
197 other_value: Value,
198 ) -> Result<Option<Ordering>, LabeledError> {
199 let _ = engine;
200 Ok(custom_value.partial_cmp(&other_value))
201 }
202
203 fn custom_value_operation(
208 &self,
209 engine: &EngineInterface,
210 left: Spanned<Box<dyn CustomValue>>,
211 operator: Spanned<Operator>,
212 right: Value,
213 ) -> Result<Value, LabeledError> {
214 let _ = engine;
215 left.item
216 .operation(left.span, operator.item, operator.span, &right)
217 .map_err(LabeledError::from)
218 }
219
220 fn custom_value_save(
225 &self,
226 engine: &EngineInterface,
227 value: Spanned<Box<dyn CustomValue>>,
228 path: Spanned<&Path>,
229 save_call_span: Span,
230 ) -> Result<(), LabeledError> {
231 let _ = engine;
232 value
233 .item
234 .save(path, value.span, save_call_span)
235 .map_err(LabeledError::from)
236 }
237
238 fn custom_value_dropped(
251 &self,
252 engine: &EngineInterface,
253 custom_value: Box<dyn CustomValue>,
254 ) -> Result<(), LabeledError> {
255 let _ = (engine, custom_value);
256 Ok(())
257 }
258}
259
260pub fn serve_plugin(plugin: &impl Plugin, encoder: impl PluginEncoder + 'static) {
279 let args: Vec<OsString> = env::args_os().skip(1).collect();
280
281 let exe = std::env::current_exe().ok();
283
284 let plugin_name: String = exe
285 .as_ref()
286 .and_then(|path| path.file_stem())
287 .map(|stem| stem.to_string_lossy().into_owned())
288 .map(|stem| {
289 stem.strip_prefix("nu_plugin_")
290 .map(|s| s.to_owned())
291 .unwrap_or(stem)
292 })
293 .unwrap_or_else(|| "(unknown)".into());
294
295 if args.is_empty() || args[0] == "-h" || args[0] == "--help" {
296 print_help(plugin, encoder);
297 std::process::exit(0)
298 }
299
300 let mode = if args[0] == "--stdio" && args.len() == 1 {
302 CommunicationMode::Stdio
304 } else if args[0] == "--local-socket" && args.len() == 2 {
305 #[cfg(feature = "local-socket")]
306 {
307 CommunicationMode::LocalSocket((&args[1]).into())
308 }
309 #[cfg(not(feature = "local-socket"))]
310 {
311 eprintln!("{plugin_name}: local socket mode is not supported");
312 std::process::exit(1);
313 }
314 } else {
315 eprintln!(
316 "{}: This plugin must be run from within Nushell. See `plugin add --help` for details \
317 on how to use plugins.",
318 env::current_exe()
319 .map(|path| path.display().to_string())
320 .unwrap_or_else(|_| "plugin".into())
321 );
322 eprintln!(
323 "If you are running from Nushell, this plugin may be incompatible with the \
324 version of nushell you are using."
325 );
326 std::process::exit(1)
327 };
328
329 let encoder_clone = encoder.clone();
330
331 let result = match mode.connect_as_client() {
332 Ok(ClientCommunicationIo::Stdio(stdin, mut stdout)) => {
333 tell_nushell_encoding(&mut stdout, &encoder).expect("failed to tell nushell encoding");
334 serve_plugin_io(
335 plugin,
336 &plugin_name,
337 move || (stdin.lock(), encoder_clone),
338 move || (stdout, encoder),
339 )
340 }
341 #[cfg(feature = "local-socket")]
342 Ok(ClientCommunicationIo::LocalSocket {
343 read_in,
344 mut write_out,
345 }) => {
346 use std::io::{BufReader, BufWriter};
347 use std::sync::Mutex;
348
349 tell_nushell_encoding(&mut write_out, &encoder)
350 .expect("failed to tell nushell encoding");
351
352 let read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, read_in);
353 let write = Mutex::new(BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, write_out));
354 serve_plugin_io(
355 plugin,
356 &plugin_name,
357 move || (read, encoder_clone),
358 move || (write, encoder),
359 )
360 }
361 Err(err) => {
362 eprintln!("{plugin_name}: failed to connect: {err:?}");
363 std::process::exit(1);
364 }
365 };
366
367 match result {
368 Ok(()) => (),
369 Err(ServePluginError::UnreportedError(err)) => {
371 eprintln!("Plugin `{plugin_name}` error: {err}");
372 std::process::exit(1);
373 }
374 Err(_) => std::process::exit(1),
375 }
376}
377
378fn tell_nushell_encoding(
379 writer: &mut impl std::io::Write,
380 encoder: &impl PluginEncoder,
381) -> Result<(), std::io::Error> {
382 let encoding = encoder.name();
387 let length = encoding.len() as u8;
388 let mut encoding_content: Vec<u8> = encoding.as_bytes().to_vec();
389 encoding_content.insert(0, length);
390 writer.write_all(&encoding_content)?;
391 writer.flush()
392}
393
394#[derive(Debug, Error)]
396pub enum ServePluginError {
397 #[error("{0}")]
399 UnreportedError(#[source] ShellError),
400 #[error("{0}")]
402 ReportedError(#[source] ShellError),
403 #[error("{0}")]
405 Incompatible(#[source] ShellError),
406 #[error("{0}")]
408 IOError(#[source] ShellError),
409 #[error("{0}")]
411 ThreadSpawnError(#[source] std::io::Error),
412 #[error("a panic occurred in a plugin thread")]
414 Panicked,
415}
416
417impl From<ShellError> for ServePluginError {
418 fn from(error: ShellError) -> Self {
419 match error {
420 ShellError::Io(_) => ServePluginError::IOError(error),
421 ShellError::PluginFailedToLoad { .. } => ServePluginError::Incompatible(error),
422 _ => ServePluginError::UnreportedError(error),
423 }
424 }
425}
426
427trait TryToReport {
429 type T;
430 fn try_to_report(self, engine: &EngineInterface) -> Result<Self::T, ServePluginError>;
431}
432
433impl<T, E> TryToReport for Result<T, E>
434where
435 E: Into<ServePluginError>,
436{
437 type T = T;
438 fn try_to_report(self, engine: &EngineInterface) -> Result<T, ServePluginError> {
439 self.map_err(|e| match e.into() {
440 ServePluginError::UnreportedError(err) => {
441 if engine.write_response(Err(err.clone())).is_ok() {
442 ServePluginError::ReportedError(err)
443 } else {
444 ServePluginError::UnreportedError(err)
445 }
446 }
447 other => other,
448 })
449 }
450}
451
452#[doc(hidden)]
459pub fn serve_plugin_io<I, O>(
460 plugin: &impl Plugin,
461 plugin_name: &str,
462 input: impl FnOnce() -> I + Send + 'static,
463 output: impl FnOnce() -> O + Send + 'static,
464) -> Result<(), ServePluginError>
465where
466 I: PluginRead<PluginInput> + 'static,
467 O: PluginWrite<PluginOutput> + 'static,
468{
469 let (error_tx, error_rx) = mpsc::channel();
470
471 let mut commands: HashMap<String, _> = HashMap::new();
473
474 for command in plugin.commands() {
475 if let Some(previous) = commands.insert(command.name().into(), command) {
476 eprintln!(
477 "Plugin `{plugin_name}` warning: command `{}` shadowed by another command with the \
478 same name. Check your commands' `name()` methods",
479 previous.name()
480 );
481 }
482 }
483
484 let mut manager = EngineInterfaceManager::new(output());
485 let call_receiver = manager
486 .take_plugin_call_receiver()
487 .expect("take_plugin_call_receiver returned None");
489
490 let interface = manager.get_interface();
492
493 interface.hello()?;
495
496 {
497 let error_tx = error_tx.clone();
499 std::thread::Builder::new()
500 .name("engine interface reader".into())
501 .spawn(move || {
502 if let Err(err) = manager.consume_all(input()) {
504 let _ = error_tx.send(ServePluginError::from(err));
505 }
506 })
507 .map_err(ServePluginError::ThreadSpawnError)?;
508 }
509
510 thread::scope(|scope| {
512 let run = |engine, call_info| {
513 let unwind_result = std::panic::catch_unwind(AssertUnwindSafe(|| {
516 let CallInfo { name, call, input } = call_info;
517 let result = if let Some(command) = commands.get(&name) {
518 command.run(plugin, &engine, &call, input)
519 } else {
520 Err(
521 LabeledError::new(format!("Plugin command not found: `{name}`"))
522 .with_label(
523 format!("plugin `{plugin_name}` doesn't have this command"),
524 call.head,
525 ),
526 )
527 };
528 let write_result = engine
529 .write_response(result)
530 .and_then(|writer| writer.write())
531 .try_to_report(&engine);
532 if let Err(err) = write_result {
533 let _ = error_tx.send(err);
534 }
535 }));
536 if unwind_result.is_err() {
537 std::process::exit(1);
539 }
540 };
541
542 let get_dynamic_completion = |engine, get_dynamic_completion_info| {
543 let unwind_result = std::panic::catch_unwind(AssertUnwindSafe(|| {
546 let GetCompletionInfo {
547 name,
548 arg_type,
549 call,
550 } = get_dynamic_completion_info;
551 let items = if let Some(command) = commands.get(&name) {
552 let arg_type = arg_type.into();
553 command.get_dynamic_completion(
554 plugin,
555 &engine,
556 call,
557 arg_type,
558 #[expect(deprecated, reason = "internal usage")]
559 nu_protocol::engine::ExperimentalMarker,
560 )
561 } else {
562 None
563 };
564 let write_result = engine.write_completion_items(items).try_to_report(&engine);
565 if let Err(err) = write_result {
566 let _ = error_tx.send(err);
567 }
568 }));
569 if unwind_result.is_err() {
570 std::process::exit(1);
572 }
573 };
574
575 let (run_tx, run_rx) = mpsc::sync_channel(0);
577 thread::Builder::new()
578 .name("plugin runner (primary)".into())
579 .spawn_scoped(scope, move || {
580 for (engine, call) in run_rx {
581 run(engine, call);
582 }
583 })
584 .map_err(ServePluginError::ThreadSpawnError)?;
585
586 for plugin_call in call_receiver {
587 if let Ok(error) = error_rx.try_recv() {
589 return Err(error);
590 }
591
592 match plugin_call {
593 ReceivedPluginCall::Metadata { engine } => {
595 engine
596 .write_metadata(PluginMetadata::new().with_version(plugin.version()))
597 .try_to_report(&engine)?;
598 }
599 ReceivedPluginCall::Signature { engine } => {
601 let sigs = commands
602 .values()
603 .map(|command| create_plugin_signature(command.deref()))
604 .map(|mut sig| {
605 render_examples(plugin, &engine, &mut sig.examples)?;
606 Ok(sig)
607 })
608 .collect::<Result<Vec<_>, ShellError>>()
609 .try_to_report(&engine)?;
610 engine.write_signature(sigs).try_to_report(&engine)?;
611 }
612 ReceivedPluginCall::Run { engine, call } => {
614 match run_tx.try_send((engine, call)) {
616 Ok(()) => (),
617 Err(TrySendError::Full((engine, call)))
619 | Err(TrySendError::Disconnected((engine, call))) => {
620 thread::Builder::new()
621 .name("plugin runner (secondary)".into())
622 .spawn_scoped(scope, move || run(engine, call))
623 .map_err(ServePluginError::ThreadSpawnError)?;
624 }
625 }
626 }
627 ReceivedPluginCall::CustomValueOp {
629 engine,
630 custom_value,
631 op,
632 } => {
633 custom_value_op(plugin, &engine, custom_value, op).try_to_report(&engine)?;
634 }
635 ReceivedPluginCall::GetCompletion { engine, info } => {
636 get_dynamic_completion(engine, info)
637 }
638 }
639 }
640
641 Ok::<_, ServePluginError>(())
642 })?;
643
644 drop(interface);
646
647 if let Ok(err) = error_rx.try_recv() {
649 Err(err)
650 } else {
651 Ok(())
652 }
653}
654
655fn custom_value_op(
656 plugin: &impl Plugin,
657 engine: &EngineInterface,
658 custom_value: Spanned<PluginCustomValue>,
659 op: CustomValueOp,
660) -> Result<(), ShellError> {
661 let local_value = custom_value
662 .item
663 .deserialize_to_custom_value(custom_value.span)?
664 .into_spanned(custom_value.span);
665 match op {
666 CustomValueOp::ToBaseValue => {
667 let result = plugin
668 .custom_value_to_base_value(engine, local_value)
669 .map(|value| PipelineData::value(value, None));
670 engine
671 .write_response(result)
672 .and_then(|writer| writer.write())
673 }
674 CustomValueOp::FollowPathInt { index, optional } => {
675 let result = plugin
676 .custom_value_follow_path_int(engine, local_value, index, optional)
677 .map(|value| PipelineData::value(value, None));
678 engine
679 .write_response(result)
680 .and_then(|writer| writer.write())
681 }
682 CustomValueOp::FollowPathString {
683 column_name,
684 optional,
685 casing,
686 } => {
687 let result = plugin
688 .custom_value_follow_path_string(engine, local_value, column_name, optional, casing)
689 .map(|value| PipelineData::value(value, None));
690 engine
691 .write_response(result)
692 .and_then(|writer| writer.write())
693 }
694 CustomValueOp::PartialCmp(mut other_value) => {
695 PluginCustomValue::deserialize_custom_values_in(&mut other_value)?;
696 match plugin.custom_value_partial_cmp(engine, local_value.item, other_value) {
697 Ok(ordering) => engine.write_ordering(ordering),
698 Err(err) => engine
699 .write_response(Err(err))
700 .and_then(|writer| writer.write()),
701 }
702 }
703 CustomValueOp::Operation(operator, mut right) => {
704 PluginCustomValue::deserialize_custom_values_in(&mut right)?;
705 let result = plugin
706 .custom_value_operation(engine, local_value, operator, right)
707 .map(|value| PipelineData::value(value, None));
708 engine
709 .write_response(result)
710 .and_then(|writer| writer.write())
711 }
712 CustomValueOp::Save {
713 path,
714 save_call_span,
715 } => {
716 let path = Spanned {
717 item: path.item.as_path(),
718 span: path.span,
719 };
720 let result = plugin.custom_value_save(engine, local_value, path, save_call_span);
721 engine.write_ok(result)
722 }
723 CustomValueOp::Dropped => {
724 let result = plugin
725 .custom_value_dropped(engine, local_value.item)
726 .map(|_| PipelineData::empty());
727 engine
728 .write_response(result)
729 .and_then(|writer| writer.write())
730 }
731 }
732}
733
734fn print_help(plugin: &impl Plugin, encoder: impl PluginEncoder) {
735 use std::fmt::Write;
736
737 println!("Nushell Plugin");
738 println!("Encoder: {}", encoder.name());
739 println!("Version: {}", plugin.version());
740
741 let exe = std::env::current_exe().ok();
743 let plugin_name: String = exe
744 .as_ref()
745 .map(|stem| stem.to_string_lossy().into_owned())
746 .unwrap_or_else(|| "(unknown)".into());
747 println!("Plugin file path: {plugin_name}");
748
749 let mut help = String::new();
750 let help_style = HelpStyle::default();
751
752 plugin.commands().into_iter().for_each(|command| {
753 let signature = command.signature();
754 let res = write!(help, "\nCommand: {}", command.name())
755 .and_then(|_| writeln!(help, "\nDescription:\n > {}", command.description()))
756 .and_then(|_| {
757 if !command.extra_description().is_empty() {
758 writeln!(
759 help,
760 "\nExtra description:\n > {}",
761 command.extra_description()
762 )
763 } else {
764 Ok(())
765 }
766 })
767 .and_then(|_| {
768 let flags = get_flags_section(&signature, &help_style, |v| match v {
769 FormatterValue::DefaultValue(value) => format!("{value:#?}"),
770 FormatterValue::CodeString(text) => text.to_string(),
771 });
772 write!(help, "{flags}")
773 })
774 .and_then(|_| writeln!(help, "\nParameters:"))
775 .and_then(|_| {
776 signature
777 .required_positional
778 .iter()
779 .try_for_each(|positional| {
780 writeln!(
781 help,
782 " {} <{}>: {}",
783 positional.name, positional.shape, positional.desc
784 )
785 })
786 })
787 .and_then(|_| {
788 signature
789 .optional_positional
790 .iter()
791 .try_for_each(|positional| {
792 writeln!(
793 help,
794 " (optional) {} <{}>: {}",
795 positional.name, positional.shape, positional.desc
796 )
797 })
798 })
799 .and_then(|_| {
800 if let Some(rest_positional) = &signature.rest_positional {
801 writeln!(
802 help,
803 " ...{} <{}>: {}",
804 rest_positional.name, rest_positional.shape, rest_positional.desc
805 )
806 } else {
807 Ok(())
808 }
809 })
810 .and_then(|_| writeln!(help, "======================"));
811
812 if res.is_err() {
813 println!("{res:?}")
814 }
815 });
816
817 println!("{help}")
818}