1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990
//! Interface used by the plugin to communicate with the engine.
use nu_plugin_core::{
util::{Sequence, Waitable, WaitableMut},
Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
StreamManagerHandle,
};
use nu_plugin_protocol::{
CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, Ordering, PluginCall,
PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput,
ProtocolInfo,
};
use nu_protocol::{
engine::Closure, Config, IntoInterruptiblePipelineData, LabeledError, ListStream, PipelineData,
PluginSignature, ShellError, Span, Spanned, Value,
};
use std::{
collections::{btree_map, BTreeMap, HashMap},
sync::{mpsc, Arc},
};
/// Plugin calls that are received by the [`EngineInterfaceManager`] for handling.
///
/// With each call, an [`EngineInterface`] is included that can be provided to the plugin code
/// and should be used to send the response. The interface sent includes the [`PluginCallId`] for
/// sending associated messages with the correct context.
///
/// This is not a public API.
#[derive(Debug)]
#[doc(hidden)]
pub enum ReceivedPluginCall {
Signature {
engine: EngineInterface,
},
Run {
engine: EngineInterface,
call: CallInfo<PipelineData>,
},
CustomValueOp {
engine: EngineInterface,
custom_value: Spanned<PluginCustomValue>,
op: CustomValueOp,
},
}
#[cfg(test)]
mod tests;
/// Internal shared state between the manager and each interface.
struct EngineInterfaceState {
/// Protocol version info, set after `Hello` received
protocol_info: Waitable<Arc<ProtocolInfo>>,
/// Sequence for generating engine call ids
engine_call_id_sequence: Sequence,
/// Sequence for generating stream ids
stream_id_sequence: Sequence,
/// Sender to subscribe to an engine call response
engine_call_subscription_sender:
mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
/// The synchronized output writer
writer: Box<dyn PluginWrite<PluginOutput>>,
}
impl std::fmt::Debug for EngineInterfaceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineInterfaceState")
.field("protocol_info", &self.protocol_info)
.field("engine_call_id_sequence", &self.engine_call_id_sequence)
.field("stream_id_sequence", &self.stream_id_sequence)
.field(
"engine_call_subscription_sender",
&self.engine_call_subscription_sender,
)
.finish_non_exhaustive()
}
}
/// Manages reading and dispatching messages for [`EngineInterface`]s.
///
/// This is not a public API.
#[derive(Debug)]
#[doc(hidden)]
pub struct EngineInterfaceManager {
/// Shared state
state: Arc<EngineInterfaceState>,
/// The writer for protocol info
protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
/// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
/// Receiver for PluginCalls. This is usually taken after initialization
plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
/// Subscriptions for engine call responses
engine_call_subscriptions:
BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
/// Receiver for engine call subscriptions
engine_call_subscription_receiver:
mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
/// Manages stream messages and state
stream_manager: StreamManager,
}
impl EngineInterfaceManager {
pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
let (plug_tx, plug_rx) = mpsc::channel();
let (subscription_tx, subscription_rx) = mpsc::channel();
let protocol_info_mut = WaitableMut::new();
EngineInterfaceManager {
state: Arc::new(EngineInterfaceState {
protocol_info: protocol_info_mut.reader(),
engine_call_id_sequence: Sequence::default(),
stream_id_sequence: Sequence::default(),
engine_call_subscription_sender: subscription_tx,
writer: Box::new(writer),
}),
protocol_info_mut,
plugin_call_sender: Some(plug_tx),
plugin_call_receiver: Some(plug_rx),
engine_call_subscriptions: BTreeMap::new(),
engine_call_subscription_receiver: subscription_rx,
stream_manager: StreamManager::new(),
}
}
/// Get the receiving end of the plugin call channel. Plugin calls that need to be handled
/// will be sent here.
pub(crate) fn take_plugin_call_receiver(
&mut self,
) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
self.plugin_call_receiver.take()
}
/// Create an [`EngineInterface`] associated with the given call id.
fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
EngineInterface {
state: self.state.clone(),
stream_manager_handle: self.stream_manager.get_handle(),
context: Some(context),
}
}
/// Send a [`ReceivedPluginCall`] to the channel
fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
self.plugin_call_sender
.as_ref()
.ok_or_else(|| ShellError::PluginFailedToDecode {
msg: "Received a plugin call after Goodbye".into(),
})?
.send(plugin_call)
.map_err(|_| ShellError::NushellFailed {
msg: "Received a plugin call, but there's nowhere to send it".into(),
})
}
/// Flush any remaining subscriptions in the receiver into the map
fn receive_engine_call_subscriptions(&mut self) {
for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
e.insert(subscription);
} else {
log::warn!("Duplicate engine call ID ignored: {id}")
}
}
}
/// Send a [`EngineCallResponse`] to the appropriate sender
fn send_engine_call_response(
&mut self,
id: EngineCallId,
response: EngineCallResponse<PipelineData>,
) -> Result<(), ShellError> {
// Ensure all of the subscriptions have been flushed out of the receiver
self.receive_engine_call_subscriptions();
// Remove the sender - there is only one response per engine call
if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
if sender.send(response).is_err() {
log::warn!("Received an engine call response for id={id}, but the caller hung up");
}
Ok(())
} else {
Err(ShellError::PluginFailedToDecode {
msg: format!("Unknown engine call ID: {id}"),
})
}
}
/// True if there are no other copies of the state (which would mean there are no interfaces
/// and no stream readers/writers)
pub(crate) fn is_finished(&self) -> bool {
Arc::strong_count(&self.state) < 2
}
/// Loop on input from the given reader as long as `is_finished()` is false
///
/// Any errors will be propagated to all read streams automatically.
pub(crate) fn consume_all(
&mut self,
mut reader: impl PluginRead<PluginInput>,
) -> Result<(), ShellError> {
while let Some(msg) = reader.read().transpose() {
if self.is_finished() {
break;
}
if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
// Error to streams
let _ = self.stream_manager.broadcast_read_error(err.clone());
// Error to engine call waiters
self.receive_engine_call_subscriptions();
for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
let _ = sender.send(EngineCallResponse::Error(err.clone()));
}
return Err(err);
}
}
Ok(())
}
}
impl InterfaceManager for EngineInterfaceManager {
type Interface = EngineInterface;
type Input = PluginInput;
fn get_interface(&self) -> Self::Interface {
EngineInterface {
state: self.state.clone(),
stream_manager_handle: self.stream_manager.get_handle(),
context: None,
}
}
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from engine: {:?}", input);
match input {
PluginInput::Hello(info) => {
let info = Arc::new(info);
self.protocol_info_mut.set(info.clone())?;
let local_info = ProtocolInfo::default();
if local_info.is_compatible_with(&info)? {
Ok(())
} else {
Err(ShellError::PluginFailedToLoad {
msg: format!(
"Plugin is compiled for nushell version {}, \
which is not compatible with version {}",
local_info.version, info.version
),
})
}
}
_ if !self.state.protocol_info.is_set() => {
// Must send protocol info first
Err(ShellError::PluginFailedToLoad {
msg: "Failed to receive initial Hello message. This engine might be too old"
.into(),
})
}
// Stream messages
PluginInput::Data(..)
| PluginInput::End(..)
| PluginInput::Drop(..)
| PluginInput::Ack(..) => {
self.consume_stream_message(input.try_into().map_err(|msg| {
ShellError::NushellFailed {
msg: format!("Failed to convert message {msg:?} to StreamMessage"),
}
})?)
}
PluginInput::Call(id, call) => {
let interface = self.interface_for_context(id);
// Read streams in the input
let call = match call.map_data(|input| self.read_pipeline_data(input, None)) {
Ok(call) => call,
Err(err) => {
// If there's an error with initialization of the input stream, just send
// the error response rather than failing here
return interface.write_response(Err(err))?.write();
}
};
match call {
// We just let the receiver handle it rather than trying to store signature here
// or something
PluginCall::Signature => {
self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
}
// Parse custom values and send a ReceivedPluginCall
PluginCall::Run(mut call_info) => {
// Deserialize custom values in the arguments
if let Err(err) = deserialize_call_args(&mut call_info.call) {
return interface.write_response(Err(err))?.write();
}
// Send the plugin call to the receiver
self.send_plugin_call(ReceivedPluginCall::Run {
engine: interface,
call: call_info,
})
}
// Send request with the custom value
PluginCall::CustomValueOp(custom_value, op) => {
self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
engine: interface,
custom_value,
op,
})
}
}
}
PluginInput::Goodbye => {
// Remove the plugin call sender so it hangs up
drop(self.plugin_call_sender.take());
Ok(())
}
PluginInput::EngineCallResponse(id, response) => {
let response = response
.map_data(|header| self.read_pipeline_data(header, None))
.unwrap_or_else(|err| {
// If there's an error with initializing this stream, change it to an engine
// call error response, but send it anyway
EngineCallResponse::Error(err)
});
self.send_engine_call_response(id, response)
}
}
}
fn stream_manager(&self) -> &StreamManager {
&self.stream_manager
}
fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
// Deserialize custom values in the pipeline data
match data {
PipelineData::Value(ref mut value, _) => {
PluginCustomValue::deserialize_custom_values_in(value)?;
Ok(data)
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream
.map(|mut value| {
let span = value.span();
PluginCustomValue::deserialize_custom_values_in(&mut value)
.map(|()| value)
.unwrap_or_else(|err| Value::error(err, span))
})
.into_pipeline_data_with_metadata(meta, ctrlc)),
PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data),
}
}
}
/// Deserialize custom values in call arguments
fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
call.positional
.iter_mut()
.try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
call.named
.iter_mut()
.flat_map(|(_, value)| value.as_mut())
.try_for_each(PluginCustomValue::deserialize_custom_values_in)
}
/// A reference through which the nushell engine can be interacted with during execution.
#[derive(Debug, Clone)]
pub struct EngineInterface {
/// Shared state with the manager
state: Arc<EngineInterfaceState>,
/// Handle to stream manager
stream_manager_handle: StreamManagerHandle,
/// The plugin call this interface belongs to.
context: Option<PluginCallId>,
}
impl EngineInterface {
/// Write the protocol info. This should be done after initialization
pub(crate) fn hello(&self) -> Result<(), ShellError> {
self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
self.flush()
}
fn context(&self) -> Result<PluginCallId, ShellError> {
self.context.ok_or_else(|| ShellError::NushellFailed {
msg: "Tried to call an EngineInterface method that requires a call context \
outside of one"
.into(),
})
}
/// Write a call response of either [`PipelineData`] or an error. Returns the stream writer
/// to finish writing the stream
pub(crate) fn write_response(
&self,
result: Result<PipelineData, impl Into<LabeledError>>,
) -> Result<PipelineDataWriter<Self>, ShellError> {
match result {
Ok(data) => {
let (header, writer) = match self.init_write_pipeline_data(data, &()) {
Ok(tup) => tup,
// If we get an error while trying to construct the pipeline data, send that
// instead
Err(err) => return self.write_response(Err(err)),
};
// Write pipeline data header response, and the full stream
let response = PluginCallResponse::PipelineData(header);
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()?;
Ok(writer)
}
Err(err) => {
let response = PluginCallResponse::Error(err.into());
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()?;
Ok(Default::default())
}
}
}
/// Write a call response of plugin signatures.
///
/// Any custom values in the examples will be rendered using `to_base_value()`.
pub(crate) fn write_signature(
&self,
signature: Vec<PluginSignature>,
) -> Result<(), ShellError> {
let response = PluginCallResponse::Signature(signature);
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()
}
/// Write an engine call message. Returns the writer for the stream, and the receiver for
/// the response to the engine call.
fn write_engine_call(
&self,
call: EngineCall<PipelineData>,
) -> Result<
(
PipelineDataWriter<Self>,
mpsc::Receiver<EngineCallResponse<PipelineData>>,
),
ShellError,
> {
let context = self.context()?;
let id = self.state.engine_call_id_sequence.next()?;
let (tx, rx) = mpsc::channel();
// Convert the call into one with a header and handle the stream, if necessary
let mut writer = None;
let call = call.map_data(|input| {
let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
writer = Some(input_writer);
Ok(input_header)
})?;
// Register the channel
self.state
.engine_call_subscription_sender
.send((id, tx))
.map_err(|_| ShellError::NushellFailed {
msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
.into(),
})?;
// Write request
self.write(PluginOutput::EngineCall { context, id, call })?;
self.flush()?;
Ok((writer.unwrap_or_default(), rx))
}
/// Perform an engine call. Input and output streams are handled.
fn engine_call(
&self,
call: EngineCall<PipelineData>,
) -> Result<EngineCallResponse<PipelineData>, ShellError> {
let (writer, rx) = self.write_engine_call(call)?;
// Finish writing stream in the background
writer.write_background()?;
// Wait on receiver to get the response
rx.recv().map_err(|_| ShellError::NushellFailed {
msg: "Failed to get response to engine call because the channel was closed".into(),
})
}
/// Returns `true` if the plugin is communicating on stdio. When this is the case, stdin and
/// stdout should not be used by the plugin for other purposes.
///
/// If the plugin can not be used without access to stdio, an error should be presented to the
/// user instead.
pub fn is_using_stdio(&self) -> bool {
self.state.writer.is_stdout()
}
/// Get the full shell configuration from the engine. As this is quite a large object, it is
/// provided on request only.
///
/// # Example
///
/// Format a value in the user's preferred way:
///
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
/// let config = engine.get_config()?;
/// eprintln!("{}", value.to_expanded_string(", ", &config));
/// # Ok(())
/// # }
/// ```
pub fn get_config(&self) -> Result<Box<Config>, ShellError> {
match self.engine_call(EngineCall::GetConfig)? {
EngineCallResponse::Config(config) => Ok(config),
EngineCallResponse::Error(err) => Err(err),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response for EngineCall::GetConfig".into(),
}),
}
}
/// Do an engine call returning an `Option<Value>` as either `PipelineData::Empty` or
/// `PipelineData::Value`
fn engine_call_option_value(
&self,
engine_call: EngineCall<PipelineData>,
) -> Result<Option<Value>, ShellError> {
let name = engine_call.name();
match self.engine_call(engine_call)? {
EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
EngineCallResponse::Error(err) => Err(err),
_ => Err(ShellError::PluginFailedToDecode {
msg: format!("Received unexpected response for EngineCall::{name}"),
}),
}
}
/// Get the plugin-specific configuration from the engine. This lives in
/// `$env.config.plugins.NAME` for a plugin named `NAME`. If the config is set to a closure,
/// it is automatically evaluated each time.
///
/// # Example
///
/// Print this plugin's config:
///
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
/// let config = engine.get_plugin_config()?;
/// eprintln!("{:?}", config);
/// # Ok(())
/// # }
/// ```
pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
self.engine_call_option_value(EngineCall::GetPluginConfig)
}
/// Get an environment variable from the engine.
///
/// Returns `Some(value)` if present, and `None` if not found.
///
/// # Example
///
/// Get `$env.PATH`:
///
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface) -> Result<Option<Value>, ShellError> {
/// engine.get_env_var("PATH") // => Ok(Some(Value::List([...])))
/// # }
/// ```
pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
}
/// Get the current working directory from the engine. The result is always an absolute path.
///
/// # Example
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface) -> Result<String, ShellError> {
/// engine.get_current_dir() // => "/home/user"
/// # }
/// ```
pub fn get_current_dir(&self) -> Result<String, ShellError> {
match self.engine_call(EngineCall::GetCurrentDir)? {
// Always a string, and the span doesn't matter.
EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
Ok(val)
}
EngineCallResponse::Error(err) => Err(err),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
}),
}
}
/// Get all environment variables from the engine.
///
/// Since this is quite a large map that has to be sent, prefer to use [`.get_env_var()`] if
/// the variables needed are known ahead of time and there are only a small number needed.
///
/// # Example
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # use std::collections::HashMap;
/// # fn example(engine: &EngineInterface) -> Result<HashMap<String, Value>, ShellError> {
/// engine.get_env_vars() // => Ok({"PATH": Value::List([...]), ...})
/// # }
/// ```
pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
match self.engine_call(EngineCall::GetEnvVars)? {
EngineCallResponse::ValueMap(map) => Ok(map),
EngineCallResponse::Error(err) => Err(err),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
}),
}
}
/// Set an environment variable in the caller's scope.
///
/// If called after the plugin response has already been sent (i.e. during a stream), this will
/// only affect the environment for engine calls related to this plugin call, and will not be
/// propagated to the environment of the caller.
///
/// # Example
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
/// engine.add_env_var("FOO", Value::test_string("bar"))
/// # }
/// ```
pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
EngineCallResponse::PipelineData(_) => Ok(()),
EngineCallResponse::Error(err) => Err(err),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
}),
}
}
/// Get the help string for the current command.
///
/// This returns the same string as passing `--help` would, and can be used for the top-level
/// command in a command group that doesn't do anything on its own (e.g. `query`).
///
/// # Example
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::EngineInterface;
/// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
/// eprintln!("{}", engine.get_help()?);
/// # Ok(())
/// # }
/// ```
pub fn get_help(&self) -> Result<String, ShellError> {
match self.engine_call(EngineCall::GetHelp)? {
EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
Ok(val)
}
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::GetHelp".into(),
}),
}
}
/// Returns a guard that will keep the plugin in the foreground as long as the guard is alive.
///
/// Moving the plugin to the foreground is necessary for plugins that need to receive input and
/// signals directly from the terminal.
///
/// The exact implementation is operating system-specific. On Unix, this ensures that the
/// plugin process becomes part of the process group controlling the terminal.
pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
match self.engine_call(EngineCall::EnterForeground)? {
EngineCallResponse::Error(error) => Err(error),
EngineCallResponse::PipelineData(PipelineData::Value(
Value::Int { val: pgrp, .. },
_,
)) => {
set_pgrp_from_enter_foreground(pgrp)?;
Ok(ForegroundGuard(Some(self.clone())))
}
EngineCallResponse::PipelineData(PipelineData::Empty) => {
Ok(ForegroundGuard(Some(self.clone())))
}
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::SetForeground".into(),
}),
}
}
/// Internal: for exiting the foreground after `enter_foreground()`. Called from the guard.
fn leave_foreground(&self) -> Result<(), ShellError> {
match self.engine_call(EngineCall::LeaveForeground)? {
EngineCallResponse::Error(error) => Err(error),
EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
}),
}
}
/// Get the contents of a [`Span`] from the engine.
///
/// This method returns `Vec<u8>` as it's possible for the matched span to not be a valid UTF-8
/// string, perhaps because it sliced through the middle of a UTF-8 byte sequence, as the
/// offsets are byte-indexed. Use [`String::from_utf8_lossy()`] for display if necessary.
pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
match self.engine_call(EngineCall::GetSpanContents(span))? {
EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
Ok(val)
}
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
}),
}
}
/// Ask the engine to evaluate a closure. Input to the closure is passed as a stream, and the
/// output is available as a stream.
///
/// Set `redirect_stdout` to `true` to capture the standard output stream of an external
/// command, if the closure results in an external command.
///
/// Set `redirect_stderr` to `true` to capture the standard error stream of an external command,
/// if the closure results in an external command.
///
/// # Example
///
/// Invoked as:
///
/// ```nushell
/// my_command { seq 1 $in | each { |n| $"Hello, ($n)" } }
/// ```
///
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError, PipelineData};
/// # use nu_plugin::{EngineInterface, EvaluatedCall};
/// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
/// let closure = call.req(0)?;
/// let input = PipelineData::Value(Value::int(4, call.head), None);
/// let output = engine.eval_closure_with_stream(
/// &closure,
/// vec![],
/// input,
/// true,
/// false,
/// )?;
/// for value in output {
/// eprintln!("Closure says: {}", value.as_str()?);
/// }
/// # Ok(())
/// # }
/// ```
///
/// Output:
///
/// ```text
/// Closure says: Hello, 1
/// Closure says: Hello, 2
/// Closure says: Hello, 3
/// Closure says: Hello, 4
/// ```
pub fn eval_closure_with_stream(
&self,
closure: &Spanned<Closure>,
mut positional: Vec<Value>,
input: PipelineData,
redirect_stdout: bool,
redirect_stderr: bool,
) -> Result<PipelineData, ShellError> {
// Ensure closure args have custom values serialized
positional
.iter_mut()
.try_for_each(PluginCustomValue::serialize_custom_values_in)?;
let call = EngineCall::EvalClosure {
closure: closure.clone(),
positional,
input,
redirect_stdout,
redirect_stderr,
};
match self.engine_call(call)? {
EngineCallResponse::Error(error) => Err(error),
EngineCallResponse::PipelineData(data) => Ok(data),
_ => Err(ShellError::PluginFailedToDecode {
msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
}),
}
}
/// Ask the engine to evaluate a closure. Input is optionally passed as a [`Value`], and output
/// of the closure is collected to a [`Value`] even if it is a stream.
///
/// If the closure results in an external command, the return value will be a collected string
/// or binary value of the standard output stream of that command, similar to calling
/// [`eval_closure_with_stream()`](Self::eval_closure_with_stream) with `redirect_stdout` =
/// `true` and `redirect_stderr` = `false`.
///
/// Use [`eval_closure_with_stream()`](Self::eval_closure_with_stream) if more control over the
/// input and output is desired.
///
/// # Example
///
/// Invoked as:
///
/// ```nushell
/// my_command { |number| $number + 1}
/// ```
///
/// ```rust,no_run
/// # use nu_protocol::{Value, ShellError};
/// # use nu_plugin::{EngineInterface, EvaluatedCall};
/// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
/// let closure = call.req(0)?;
/// for n in 0..4 {
/// let result = engine.eval_closure(&closure, vec![Value::int(n, call.head)], None)?;
/// eprintln!("{} => {}", n, result.as_int()?);
/// }
/// # Ok(())
/// # }
/// ```
///
/// Output:
///
/// ```text
/// 0 => 1
/// 1 => 2
/// 2 => 3
/// 3 => 4
/// ```
pub fn eval_closure(
&self,
closure: &Spanned<Closure>,
positional: Vec<Value>,
input: Option<Value>,
) -> Result<Value, ShellError> {
let input = input.map_or_else(|| PipelineData::Empty, |v| PipelineData::Value(v, None));
let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
// Unwrap an error value
match output.into_value(closure.span) {
Value::Error { error, .. } => Err(*error),
value => Ok(value),
}
}
/// Tell the engine whether to disable garbage collection for this plugin.
///
/// The garbage collector is enabled by default, but plugins can turn it off (ideally
/// temporarily) as necessary to implement functionality that requires the plugin to stay
/// running for longer than the engine can automatically determine.
///
/// The user can still stop the plugin if they want to with the `plugin stop` command.
pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
self.flush()
}
/// Write a call response of [`Ordering`], for `partial_cmp`.
pub(crate) fn write_ordering(
&self,
ordering: Option<impl Into<Ordering>>,
) -> Result<(), ShellError> {
let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
self.write(PluginOutput::CallResponse(self.context()?, response))?;
self.flush()
}
}
impl Interface for EngineInterface {
type Output = PluginOutput;
type DataContext = ();
fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
log::trace!("to engine: {:?}", output);
self.state.writer.write(&output)
}
fn flush(&self) -> Result<(), ShellError> {
self.state.writer.flush()
}
fn stream_id_sequence(&self) -> &Sequence {
&self.state.stream_id_sequence
}
fn stream_manager_handle(&self) -> &StreamManagerHandle {
&self.stream_manager_handle
}
fn prepare_pipeline_data(
&self,
mut data: PipelineData,
_context: &(),
) -> Result<PipelineData, ShellError> {
// Serialize custom values in the pipeline data
match data {
PipelineData::Value(ref mut value, _) => {
PluginCustomValue::serialize_custom_values_in(value)?;
Ok(data)
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => Ok(stream
.map(|mut value| {
let span = value.span();
PluginCustomValue::serialize_custom_values_in(&mut value)
.map(|_| value)
.unwrap_or_else(|err| Value::error(err, span))
})
.into_pipeline_data_with_metadata(meta, ctrlc)),
PipelineData::Empty | PipelineData::ExternalStream { .. } => Ok(data),
}
}
}
/// Keeps the plugin in the foreground as long as it is alive.
///
/// Use [`.leave()`] to leave the foreground without ignoring the error.
pub struct ForegroundGuard(Option<EngineInterface>);
impl ForegroundGuard {
// Should be called only once
fn leave_internal(&mut self) -> Result<(), ShellError> {
if let Some(interface) = self.0.take() {
// On Unix, we need to put ourselves back in our own process group
#[cfg(unix)]
{
use nix::unistd::{setpgid, Pid};
// This should always succeed, frankly, but handle the error just in case
setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| ShellError::IOError {
msg: err.to_string(),
})?;
}
interface.leave_foreground()?;
}
Ok(())
}
/// Leave the foreground. In contrast to dropping the guard, this preserves the error (if any).
pub fn leave(mut self) -> Result<(), ShellError> {
let result = self.leave_internal();
std::mem::forget(self);
result
}
}
impl Drop for ForegroundGuard {
fn drop(&mut self) {
let _ = self.leave_internal();
}
}
#[cfg(unix)]
fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
use nix::unistd::{setpgid, Pid};
if let Ok(pgrp) = pgrp.try_into() {
setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
error: "Failed to set process group for foreground".into(),
msg: "".into(),
span: None,
help: Some(err.to_string()),
inner: vec![],
})
} else {
Err(ShellError::NushellFailed {
msg: "Engine returned an invalid process group ID".into(),
})
}
}
#[cfg(not(unix))]
fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
Err(ShellError::NushellFailed {
msg: concat!(
"EnterForeground asked plugin to join process group, but not supported on ",
cfg!(target_os)
)
.into(),
})
}