nu_plugin/plugin/interface/
mod.rs

1//! Interface used by the plugin to communicate with the engine.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10    PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11    PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14    Config, DeclId, Handler, HandlerGuard, Handlers, LabeledError, PipelineData, PluginMetadata,
15    PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16    engine::{Closure, Sequence},
17};
18use nu_utils::SharedCow;
19use std::{
20    collections::{BTreeMap, HashMap, btree_map},
21    sync::{Arc, atomic::AtomicBool, mpsc},
22};
23
24/// Plugin calls that are received by the [`EngineInterfaceManager`] for handling.
25///
26/// With each call, an [`EngineInterface`] is included that can be provided to the plugin code
27/// and should be used to send the response. The interface sent includes the [`PluginCallId`] for
28/// sending associated messages with the correct context.
29///
30/// This is not a public API.
31#[derive(Debug)]
32#[doc(hidden)]
33pub enum ReceivedPluginCall {
34    Metadata {
35        engine: EngineInterface,
36    },
37    Signature {
38        engine: EngineInterface,
39    },
40    Run {
41        engine: EngineInterface,
42        call: CallInfo<PipelineData>,
43    },
44    CustomValueOp {
45        engine: EngineInterface,
46        custom_value: Spanned<PluginCustomValue>,
47        op: CustomValueOp,
48    },
49}
50
51#[cfg(test)]
52mod tests;
53
54/// Internal shared state between the manager and each interface.
55struct EngineInterfaceState {
56    /// Protocol version info, set after `Hello` received
57    protocol_info: Waitable<Arc<ProtocolInfo>>,
58    /// Sequence for generating engine call ids
59    engine_call_id_sequence: Sequence,
60    /// Sequence for generating stream ids
61    stream_id_sequence: Sequence,
62    /// Sender to subscribe to an engine call response
63    engine_call_subscription_sender:
64        mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
65    /// The synchronized output writer
66    writer: Box<dyn PluginWrite<PluginOutput>>,
67    /// Mirror signals from `EngineState`. You can make use of this with
68    /// `engine_interface.signals()` when constructing a Stream that requires signals
69    signals: Signals,
70    /// Registered signal handlers
71    signal_handlers: Handlers,
72}
73
74impl std::fmt::Debug for EngineInterfaceState {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("EngineInterfaceState")
77            .field("protocol_info", &self.protocol_info)
78            .field("engine_call_id_sequence", &self.engine_call_id_sequence)
79            .field("stream_id_sequence", &self.stream_id_sequence)
80            .field(
81                "engine_call_subscription_sender",
82                &self.engine_call_subscription_sender,
83            )
84            .finish_non_exhaustive()
85    }
86}
87
88/// Manages reading and dispatching messages for [`EngineInterface`]s.
89///
90/// This is not a public API.
91#[derive(Debug)]
92#[doc(hidden)]
93pub struct EngineInterfaceManager {
94    /// Shared state
95    state: Arc<EngineInterfaceState>,
96    /// The writer for protocol info
97    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
98    /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
99    plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
100    /// Receiver for PluginCalls. This is usually taken after initialization
101    plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
102    /// Subscriptions for engine call responses
103    engine_call_subscriptions:
104        BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
105    /// Receiver for engine call subscriptions
106    engine_call_subscription_receiver:
107        mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
108    /// Manages stream messages and state
109    stream_manager: StreamManager,
110}
111
112impl EngineInterfaceManager {
113    pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
114        let (plug_tx, plug_rx) = mpsc::channel();
115        let (subscription_tx, subscription_rx) = mpsc::channel();
116        let protocol_info_mut = WaitableMut::new();
117
118        EngineInterfaceManager {
119            state: Arc::new(EngineInterfaceState {
120                protocol_info: protocol_info_mut.reader(),
121                engine_call_id_sequence: Sequence::default(),
122                stream_id_sequence: Sequence::default(),
123                engine_call_subscription_sender: subscription_tx,
124                writer: Box::new(writer),
125                signals: Signals::new(Arc::new(AtomicBool::new(false))),
126                signal_handlers: Handlers::new(),
127            }),
128            protocol_info_mut,
129            plugin_call_sender: Some(plug_tx),
130            plugin_call_receiver: Some(plug_rx),
131            engine_call_subscriptions: BTreeMap::new(),
132            engine_call_subscription_receiver: subscription_rx,
133            stream_manager: StreamManager::new(),
134        }
135    }
136
137    /// Get the receiving end of the plugin call channel. Plugin calls that need to be handled
138    /// will be sent here.
139    pub(crate) fn take_plugin_call_receiver(
140        &mut self,
141    ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
142        self.plugin_call_receiver.take()
143    }
144
145    /// Create an [`EngineInterface`] associated with the given call id.
146    fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
147        EngineInterface {
148            state: self.state.clone(),
149            stream_manager_handle: self.stream_manager.get_handle(),
150            context: Some(context),
151        }
152    }
153
154    /// Send a [`ReceivedPluginCall`] to the channel
155    fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
156        self.plugin_call_sender
157            .as_ref()
158            .ok_or_else(|| ShellError::PluginFailedToDecode {
159                msg: "Received a plugin call after Goodbye".into(),
160            })?
161            .send(plugin_call)
162            .map_err(|_| ShellError::NushellFailed {
163                msg: "Received a plugin call, but there's nowhere to send it".into(),
164            })
165    }
166
167    /// Flush any remaining subscriptions in the receiver into the map
168    fn receive_engine_call_subscriptions(&mut self) {
169        for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
170            if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
171                e.insert(subscription);
172            } else {
173                log::warn!("Duplicate engine call ID ignored: {id}")
174            }
175        }
176    }
177
178    /// Send a [`EngineCallResponse`] to the appropriate sender
179    fn send_engine_call_response(
180        &mut self,
181        id: EngineCallId,
182        response: EngineCallResponse<PipelineData>,
183    ) -> Result<(), ShellError> {
184        // Ensure all of the subscriptions have been flushed out of the receiver
185        self.receive_engine_call_subscriptions();
186        // Remove the sender - there is only one response per engine call
187        if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
188            if sender.send(response).is_err() {
189                log::warn!("Received an engine call response for id={id}, but the caller hung up");
190            }
191            Ok(())
192        } else {
193            Err(ShellError::PluginFailedToDecode {
194                msg: format!("Unknown engine call ID: {id}"),
195            })
196        }
197    }
198
199    /// True if there are no other copies of the state (which would mean there are no interfaces
200    /// and no stream readers/writers)
201    pub(crate) fn is_finished(&self) -> bool {
202        Arc::strong_count(&self.state) < 2
203    }
204
205    /// Loop on input from the given reader as long as `is_finished()` is false
206    ///
207    /// Any errors will be propagated to all read streams automatically.
208    pub(crate) fn consume_all(
209        &mut self,
210        mut reader: impl PluginRead<PluginInput>,
211    ) -> Result<(), ShellError> {
212        while let Some(msg) = reader.read().transpose() {
213            if self.is_finished() {
214                break;
215            }
216
217            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
218                // Error to streams
219                let _ = self.stream_manager.broadcast_read_error(err.clone());
220                // Error to engine call waiters
221                self.receive_engine_call_subscriptions();
222                for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
223                    let _ = sender.send(EngineCallResponse::Error(err.clone()));
224                }
225                return Err(err);
226            }
227        }
228        Ok(())
229    }
230}
231
232impl InterfaceManager for EngineInterfaceManager {
233    type Interface = EngineInterface;
234    type Input = PluginInput;
235
236    fn get_interface(&self) -> Self::Interface {
237        EngineInterface {
238            state: self.state.clone(),
239            stream_manager_handle: self.stream_manager.get_handle(),
240            context: None,
241        }
242    }
243
244    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
245        log::trace!("from engine: {input:?}");
246        match input {
247            PluginInput::Hello(info) => {
248                let info = Arc::new(info);
249                self.protocol_info_mut.set(info.clone())?;
250
251                let local_info = ProtocolInfo::default();
252                if local_info.is_compatible_with(&info)? {
253                    Ok(())
254                } else {
255                    Err(ShellError::PluginFailedToLoad {
256                        msg: format!(
257                            "Plugin is compiled for nushell version {}, \
258                                which is not compatible with version {}",
259                            local_info.version, info.version
260                        ),
261                    })
262                }
263            }
264            _ if !self.state.protocol_info.is_set() => {
265                // Must send protocol info first
266                Err(ShellError::PluginFailedToLoad {
267                    msg: "Failed to receive initial Hello message. This engine might be too old"
268                        .into(),
269                })
270            }
271            // Stream messages
272            PluginInput::Data(..)
273            | PluginInput::End(..)
274            | PluginInput::Drop(..)
275            | PluginInput::Ack(..) => {
276                self.consume_stream_message(input.try_into().map_err(|msg| {
277                    ShellError::NushellFailed {
278                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
279                    }
280                })?)
281            }
282            PluginInput::Call(id, call) => {
283                let interface = self.interface_for_context(id);
284                // Read streams in the input
285                let call = match call
286                    .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
287                {
288                    Ok(call) => call,
289                    Err(err) => {
290                        // If there's an error with initialization of the input stream, just send
291                        // the error response rather than failing here
292                        return interface.write_response(Err(err))?.write();
293                    }
294                };
295                match call {
296                    // Ask the plugin for metadata
297                    PluginCall::Metadata => {
298                        self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
299                    }
300                    // Ask the plugin for signatures
301                    PluginCall::Signature => {
302                        self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
303                    }
304                    // Parse custom values and send a ReceivedPluginCall
305                    PluginCall::Run(mut call_info) => {
306                        // Deserialize custom values in the arguments
307                        if let Err(err) = deserialize_call_args(&mut call_info.call) {
308                            return interface.write_response(Err(err))?.write();
309                        }
310                        // Send the plugin call to the receiver
311                        self.send_plugin_call(ReceivedPluginCall::Run {
312                            engine: interface,
313                            call: call_info,
314                        })
315                    }
316                    // Send request with the custom value
317                    PluginCall::CustomValueOp(custom_value, op) => {
318                        self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
319                            engine: interface,
320                            custom_value,
321                            op,
322                        })
323                    }
324                }
325            }
326            PluginInput::Goodbye => {
327                // Remove the plugin call sender so it hangs up
328                drop(self.plugin_call_sender.take());
329                Ok(())
330            }
331            PluginInput::EngineCallResponse(id, response) => {
332                let response = response
333                    .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
334                    .unwrap_or_else(|err| {
335                        // If there's an error with initializing this stream, change it to an engine
336                        // call error response, but send it anyway
337                        EngineCallResponse::Error(err)
338                    });
339                self.send_engine_call_response(id, response)
340            }
341            PluginInput::Signal(action) => {
342                match action {
343                    SignalAction::Interrupt => self.state.signals.trigger(),
344                    SignalAction::Reset => self.state.signals.reset(),
345                }
346                self.state.signal_handlers.run(action);
347                Ok(())
348            }
349        }
350    }
351
352    fn stream_manager(&self) -> &StreamManager {
353        &self.stream_manager
354    }
355
356    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
357        // Deserialize custom values in the pipeline data
358        match data {
359            PipelineData::Value(ref mut value, _) => {
360                PluginCustomValue::deserialize_custom_values_in(value)?;
361                Ok(data)
362            }
363            PipelineData::ListStream(stream, meta) => {
364                let stream = stream.map(|mut value| {
365                    let span = value.span();
366                    PluginCustomValue::deserialize_custom_values_in(&mut value)
367                        .map(|()| value)
368                        .unwrap_or_else(|err| Value::error(err, span))
369                });
370                Ok(PipelineData::list_stream(stream, meta))
371            }
372            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
373        }
374    }
375}
376
377/// Deserialize custom values in call arguments
378fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
379    call.positional
380        .iter_mut()
381        .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
382    call.named
383        .iter_mut()
384        .flat_map(|(_, value)| value.as_mut())
385        .try_for_each(PluginCustomValue::deserialize_custom_values_in)
386}
387
388/// A reference through which the nushell engine can be interacted with during execution.
389#[derive(Debug, Clone)]
390pub struct EngineInterface {
391    /// Shared state with the manager
392    state: Arc<EngineInterfaceState>,
393    /// Handle to stream manager
394    stream_manager_handle: StreamManagerHandle,
395    /// The plugin call this interface belongs to.
396    context: Option<PluginCallId>,
397}
398
399impl EngineInterface {
400    /// Write the protocol info. This should be done after initialization
401    pub(crate) fn hello(&self) -> Result<(), ShellError> {
402        self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
403        self.flush()
404    }
405
406    fn context(&self) -> Result<PluginCallId, ShellError> {
407        self.context.ok_or_else(|| ShellError::NushellFailed {
408            msg: "Tried to call an EngineInterface method that requires a call context \
409                outside of one"
410                .into(),
411        })
412    }
413
414    /// Write an OK call response or an error.
415    pub(crate) fn write_ok(
416        &self,
417        result: Result<(), impl Into<LabeledError>>,
418    ) -> Result<(), ShellError> {
419        let response = match result {
420            Ok(()) => PluginCallResponse::Ok,
421            Err(err) => PluginCallResponse::Error(err.into()),
422        };
423        self.write(PluginOutput::CallResponse(self.context()?, response))?;
424        self.flush()
425    }
426
427    /// Write a call response of either [`PipelineData`] or an error. Returns the stream writer
428    /// to finish writing the stream
429    pub(crate) fn write_response(
430        &self,
431        result: Result<PipelineData, impl Into<LabeledError>>,
432    ) -> Result<PipelineDataWriter<Self>, ShellError> {
433        match result {
434            Ok(data) => {
435                let (header, writer) = match self.init_write_pipeline_data(data, &()) {
436                    Ok(tup) => tup,
437                    // If we get an error while trying to construct the pipeline data, send that
438                    // instead
439                    Err(err) => return self.write_response(Err(err)),
440                };
441                // Write pipeline data header response, and the full stream
442                let response = PluginCallResponse::PipelineData(header);
443                self.write(PluginOutput::CallResponse(self.context()?, response))?;
444                self.flush()?;
445                Ok(writer)
446            }
447            Err(err) => {
448                let response = PluginCallResponse::Error(err.into());
449                self.write(PluginOutput::CallResponse(self.context()?, response))?;
450                self.flush()?;
451                Ok(Default::default())
452            }
453        }
454    }
455
456    /// Write a call response of plugin metadata.
457    pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
458        let response = PluginCallResponse::Metadata(metadata);
459        self.write(PluginOutput::CallResponse(self.context()?, response))?;
460        self.flush()
461    }
462
463    /// Write a call response of plugin signatures.
464    ///
465    /// Any custom values in the examples will be rendered using `to_base_value()`.
466    pub(crate) fn write_signature(
467        &self,
468        signature: Vec<PluginSignature>,
469    ) -> Result<(), ShellError> {
470        let response = PluginCallResponse::Signature(signature);
471        self.write(PluginOutput::CallResponse(self.context()?, response))?;
472        self.flush()
473    }
474
475    /// Write an engine call message. Returns the writer for the stream, and the receiver for
476    /// the response to the engine call.
477    fn write_engine_call(
478        &self,
479        call: EngineCall<PipelineData>,
480    ) -> Result<
481        (
482            PipelineDataWriter<Self>,
483            mpsc::Receiver<EngineCallResponse<PipelineData>>,
484        ),
485        ShellError,
486    > {
487        let context = self.context()?;
488        let id = self.state.engine_call_id_sequence.next()?;
489        let (tx, rx) = mpsc::channel();
490
491        // Convert the call into one with a header and handle the stream, if necessary
492        let mut writer = None;
493
494        let call = call.map_data(|input| {
495            let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
496            writer = Some(input_writer);
497            Ok(input_header)
498        })?;
499
500        // Register the channel
501        self.state
502            .engine_call_subscription_sender
503            .send((id, tx))
504            .map_err(|_| ShellError::NushellFailed {
505                msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
506                    .into(),
507            })?;
508
509        // Write request
510        self.write(PluginOutput::EngineCall { context, id, call })?;
511        self.flush()?;
512
513        Ok((writer.unwrap_or_default(), rx))
514    }
515
516    /// Perform an engine call. Input and output streams are handled.
517    fn engine_call(
518        &self,
519        call: EngineCall<PipelineData>,
520    ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
521        let (writer, rx) = self.write_engine_call(call)?;
522
523        // Finish writing stream in the background
524        writer.write_background()?;
525
526        // Wait on receiver to get the response
527        rx.recv().map_err(|_| ShellError::NushellFailed {
528            msg: "Failed to get response to engine call because the channel was closed".into(),
529        })
530    }
531
532    /// Returns `true` if the plugin is communicating on stdio. When this is the case, stdin and
533    /// stdout should not be used by the plugin for other purposes.
534    ///
535    /// If the plugin can not be used without access to stdio, an error should be presented to the
536    /// user instead.
537    pub fn is_using_stdio(&self) -> bool {
538        self.state.writer.is_stdout()
539    }
540
541    /// Register a closure which will be called when the engine receives an interrupt signal.
542    /// Returns a RAII guard that will keep the closure alive until it is dropped.
543    pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
544        self.state.signal_handlers.register(handler)
545    }
546
547    /// Get the full shell configuration from the engine. As this is quite a large object, it is
548    /// provided on request only.
549    ///
550    /// # Example
551    ///
552    /// Format a value in the user's preferred way:
553    ///
554    /// ```rust,no_run
555    /// # use nu_protocol::{Value, ShellError};
556    /// # use nu_plugin::EngineInterface;
557    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
558    /// let config = engine.get_config()?;
559    /// eprintln!("{}", value.to_expanded_string(", ", &config));
560    /// # Ok(())
561    /// # }
562    /// ```
563    pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
564        match self.engine_call(EngineCall::GetConfig)? {
565            EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
566            EngineCallResponse::Error(err) => Err(err),
567            _ => Err(ShellError::PluginFailedToDecode {
568                msg: "Received unexpected response for EngineCall::GetConfig".into(),
569            }),
570        }
571    }
572
573    /// Do an engine call returning an `Option<Value>` as either `PipelineData::empty()` or
574    /// `PipelineData::value`
575    fn engine_call_option_value(
576        &self,
577        engine_call: EngineCall<PipelineData>,
578    ) -> Result<Option<Value>, ShellError> {
579        let name = engine_call.name();
580        match self.engine_call(engine_call)? {
581            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
582            EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
583            EngineCallResponse::Error(err) => Err(err),
584            _ => Err(ShellError::PluginFailedToDecode {
585                msg: format!("Received unexpected response for EngineCall::{name}"),
586            }),
587        }
588    }
589
590    /// Get the plugin-specific configuration from the engine. This lives in
591    /// `$env.config.plugins.NAME` for a plugin named `NAME`. If the config is set to a closure,
592    /// it is automatically evaluated each time.
593    ///
594    /// # Example
595    ///
596    /// Print this plugin's config:
597    ///
598    /// ```rust,no_run
599    /// # use nu_protocol::{Value, ShellError};
600    /// # use nu_plugin::EngineInterface;
601    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
602    /// let config = engine.get_plugin_config()?;
603    /// eprintln!("{:?}", config);
604    /// # Ok(())
605    /// # }
606    /// ```
607    pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
608        self.engine_call_option_value(EngineCall::GetPluginConfig)
609    }
610
611    /// Get an environment variable from the engine.
612    ///
613    /// Returns `Some(value)` if present, and `None` if not found.
614    ///
615    /// # Example
616    ///
617    /// Get `$env.PATH`:
618    ///
619    /// ```rust,no_run
620    /// # use nu_protocol::{Value, ShellError};
621    /// # use nu_plugin::EngineInterface;
622    /// # fn example(engine: &EngineInterface) -> Result<Option<Value>, ShellError> {
623    /// engine.get_env_var("PATH") // => Ok(Some(Value::List([...])))
624    /// # }
625    /// ```
626    pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
627        self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
628    }
629
630    /// Get the current working directory from the engine. The result is always an absolute path.
631    ///
632    /// # Example
633    /// ```rust,no_run
634    /// # use nu_protocol::{Value, ShellError};
635    /// # use nu_plugin::EngineInterface;
636    /// # fn example(engine: &EngineInterface) -> Result<String, ShellError> {
637    /// engine.get_current_dir() // => "/home/user"
638    /// # }
639    /// ```
640    pub fn get_current_dir(&self) -> Result<String, ShellError> {
641        match self.engine_call(EngineCall::GetCurrentDir)? {
642            // Always a string, and the span doesn't matter.
643            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
644                Ok(val)
645            }
646            EngineCallResponse::Error(err) => Err(err),
647            _ => Err(ShellError::PluginFailedToDecode {
648                msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
649            }),
650        }
651    }
652
653    /// Get all environment variables from the engine.
654    ///
655    /// Since this is quite a large map that has to be sent, prefer to use
656    /// [`.get_env_var()`] (Self::get_env_var) if the variables needed are known ahead of time
657    /// and there are only a small number needed.
658    ///
659    /// # Example
660    /// ```rust,no_run
661    /// # use nu_protocol::{Value, ShellError};
662    /// # use nu_plugin::EngineInterface;
663    /// # use std::collections::HashMap;
664    /// # fn example(engine: &EngineInterface) -> Result<HashMap<String, Value>, ShellError> {
665    /// engine.get_env_vars() // => Ok({"PATH": Value::List([...]), ...})
666    /// # }
667    /// ```
668    pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
669        match self.engine_call(EngineCall::GetEnvVars)? {
670            EngineCallResponse::ValueMap(map) => Ok(map),
671            EngineCallResponse::Error(err) => Err(err),
672            _ => Err(ShellError::PluginFailedToDecode {
673                msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
674            }),
675        }
676    }
677
678    /// Set an environment variable in the caller's scope.
679    ///
680    /// If called after the plugin response has already been sent (i.e. during a stream), this will
681    /// only affect the environment for engine calls related to this plugin call, and will not be
682    /// propagated to the environment of the caller.
683    ///
684    /// # Example
685    /// ```rust,no_run
686    /// # use nu_protocol::{Value, ShellError};
687    /// # use nu_plugin::EngineInterface;
688    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
689    /// engine.add_env_var("FOO", Value::test_string("bar"))
690    /// # }
691    /// ```
692    pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
693        match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
694            EngineCallResponse::PipelineData(_) => Ok(()),
695            EngineCallResponse::Error(err) => Err(err),
696            _ => Err(ShellError::PluginFailedToDecode {
697                msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
698            }),
699        }
700    }
701
702    /// Get the help string for the current command.
703    ///
704    /// This returns the same string as passing `--help` would, and can be used for the top-level
705    /// command in a command group that doesn't do anything on its own (e.g. `query`).
706    ///
707    /// # Example
708    /// ```rust,no_run
709    /// # use nu_protocol::{Value, ShellError};
710    /// # use nu_plugin::EngineInterface;
711    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
712    /// eprintln!("{}", engine.get_help()?);
713    /// # Ok(())
714    /// # }
715    /// ```
716    pub fn get_help(&self) -> Result<String, ShellError> {
717        match self.engine_call(EngineCall::GetHelp)? {
718            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
719                Ok(val)
720            }
721            _ => Err(ShellError::PluginFailedToDecode {
722                msg: "Received unexpected response type for EngineCall::GetHelp".into(),
723            }),
724        }
725    }
726
727    /// Returns a guard that will keep the plugin in the foreground as long as the guard is alive.
728    ///
729    /// Moving the plugin to the foreground is necessary for plugins that need to receive input and
730    /// signals directly from the terminal.
731    ///
732    /// The exact implementation is operating system-specific. On Unix, this ensures that the
733    /// plugin process becomes part of the process group controlling the terminal.
734    pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
735        match self.engine_call(EngineCall::EnterForeground)? {
736            EngineCallResponse::Error(error) => Err(error),
737            EngineCallResponse::PipelineData(PipelineData::Value(
738                Value::Int { val: pgrp, .. },
739                _,
740            )) => {
741                set_pgrp_from_enter_foreground(pgrp)?;
742                Ok(ForegroundGuard(Some(self.clone())))
743            }
744            EngineCallResponse::PipelineData(PipelineData::Empty) => {
745                Ok(ForegroundGuard(Some(self.clone())))
746            }
747            _ => Err(ShellError::PluginFailedToDecode {
748                msg: "Received unexpected response type for EngineCall::SetForeground".into(),
749            }),
750        }
751    }
752
753    /// Internal: for exiting the foreground after `enter_foreground()`. Called from the guard.
754    fn leave_foreground(&self) -> Result<(), ShellError> {
755        match self.engine_call(EngineCall::LeaveForeground)? {
756            EngineCallResponse::Error(error) => Err(error),
757            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
758            _ => Err(ShellError::PluginFailedToDecode {
759                msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
760            }),
761        }
762    }
763
764    /// Get the contents of a [`Span`] from the engine.
765    ///
766    /// This method returns `Vec<u8>` as it's possible for the matched span to not be a valid UTF-8
767    /// string, perhaps because it sliced through the middle of a UTF-8 byte sequence, as the
768    /// offsets are byte-indexed. Use [`String::from_utf8_lossy()`] for display if necessary.
769    pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
770        match self.engine_call(EngineCall::GetSpanContents(span))? {
771            EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
772                Ok(val)
773            }
774            _ => Err(ShellError::PluginFailedToDecode {
775                msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
776            }),
777        }
778    }
779
780    /// Ask the engine to evaluate a closure. Input to the closure is passed as a stream, and the
781    /// output is available as a stream.
782    ///
783    /// Set `redirect_stdout` to `true` to capture the standard output stream of an external
784    /// command, if the closure results in an external command.
785    ///
786    /// Set `redirect_stderr` to `true` to capture the standard error stream of an external command,
787    /// if the closure results in an external command.
788    ///
789    /// # Example
790    ///
791    /// Invoked as:
792    ///
793    /// ```nushell
794    /// my_command { seq 1 $in | each { |n| $"Hello, ($n)" } }
795    /// ```
796    ///
797    /// ```rust,no_run
798    /// # use nu_protocol::{Value, ShellError, PipelineData};
799    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
800    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
801    /// let closure = call.req(0)?;
802    /// let input = PipelineData::value(Value::int(4, call.head), None);
803    /// let output = engine.eval_closure_with_stream(
804    ///     &closure,
805    ///     vec![],
806    ///     input,
807    ///     true,
808    ///     false,
809    /// )?;
810    /// for value in output {
811    ///     eprintln!("Closure says: {}", value.as_str()?);
812    /// }
813    /// # Ok(())
814    /// # }
815    /// ```
816    ///
817    /// Output:
818    ///
819    /// ```text
820    /// Closure says: Hello, 1
821    /// Closure says: Hello, 2
822    /// Closure says: Hello, 3
823    /// Closure says: Hello, 4
824    /// ```
825    pub fn eval_closure_with_stream(
826        &self,
827        closure: &Spanned<Closure>,
828        mut positional: Vec<Value>,
829        input: PipelineData,
830        redirect_stdout: bool,
831        redirect_stderr: bool,
832    ) -> Result<PipelineData, ShellError> {
833        // Ensure closure args have custom values serialized
834        positional
835            .iter_mut()
836            .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
837
838        let call = EngineCall::EvalClosure {
839            closure: closure.clone(),
840            positional,
841            input,
842            redirect_stdout,
843            redirect_stderr,
844        };
845
846        match self.engine_call(call)? {
847            EngineCallResponse::Error(error) => Err(error),
848            EngineCallResponse::PipelineData(data) => Ok(data),
849            _ => Err(ShellError::PluginFailedToDecode {
850                msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
851            }),
852        }
853    }
854
855    /// Ask the engine to evaluate a closure. Input is optionally passed as a [`Value`], and output
856    /// of the closure is collected to a [`Value`] even if it is a stream.
857    ///
858    /// If the closure results in an external command, the return value will be a collected string
859    /// or binary value of the standard output stream of that command, similar to calling
860    /// [`eval_closure_with_stream()`](Self::eval_closure_with_stream) with `redirect_stdout` =
861    /// `true` and `redirect_stderr` = `false`.
862    ///
863    /// Use [`eval_closure_with_stream()`](Self::eval_closure_with_stream) if more control over the
864    /// input and output is desired.
865    ///
866    /// # Example
867    ///
868    /// Invoked as:
869    ///
870    /// ```nushell
871    /// my_command { |number| $number + 1}
872    /// ```
873    ///
874    /// ```rust,no_run
875    /// # use nu_protocol::{Value, ShellError};
876    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
877    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
878    /// let closure = call.req(0)?;
879    /// for n in 0..4 {
880    ///     let result = engine.eval_closure(&closure, vec![Value::int(n, call.head)], None)?;
881    ///     eprintln!("{} => {}", n, result.as_int()?);
882    /// }
883    /// # Ok(())
884    /// # }
885    /// ```
886    ///
887    /// Output:
888    ///
889    /// ```text
890    /// 0 => 1
891    /// 1 => 2
892    /// 2 => 3
893    /// 3 => 4
894    /// ```
895    pub fn eval_closure(
896        &self,
897        closure: &Spanned<Closure>,
898        positional: Vec<Value>,
899        input: Option<Value>,
900    ) -> Result<Value, ShellError> {
901        let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
902        let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
903        // Unwrap an error value
904        match output.into_value(closure.span)? {
905            Value::Error { error, .. } => Err(*error),
906            value => Ok(value),
907        }
908    }
909
910    /// Ask the engine for the identifier for a declaration. If found, the result can then be passed
911    /// to [`.call_decl()`](Self::call_decl) to call other internal commands.
912    ///
913    /// See [`.call_decl()`](Self::call_decl) for an example.
914    pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
915        let call = EngineCall::FindDecl(name.into());
916
917        match self.engine_call(call)? {
918            EngineCallResponse::Error(err) => Err(err),
919            EngineCallResponse::Identifier(id) => Ok(Some(id)),
920            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
921            _ => Err(ShellError::PluginFailedToDecode {
922                msg: "Received unexpected response type for EngineCall::FindDecl".into(),
923            }),
924        }
925    }
926
927    /// Ask the engine to call an internal command, using the declaration ID previously looked up
928    /// with [`.find_decl()`](Self::find_decl).
929    ///
930    /// # Example
931    ///
932    /// ```rust,no_run
933    /// # use nu_protocol::{Value, ShellError, PipelineData};
934    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
935    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<Value, ShellError> {
936    /// if let Some(decl_id) = engine.find_decl("scope commands")? {
937    ///     let commands = engine.call_decl(
938    ///         decl_id,
939    ///         EvaluatedCall::new(call.head),
940    ///         PipelineData::empty(),
941    ///         true,
942    ///         false,
943    ///     )?;
944    ///     commands.into_value(call.head)
945    /// } else {
946    ///     Ok(Value::list(vec![], call.head))
947    /// }
948    /// # }
949    /// ```
950    pub fn call_decl(
951        &self,
952        decl_id: DeclId,
953        call: EvaluatedCall,
954        input: PipelineData,
955        redirect_stdout: bool,
956        redirect_stderr: bool,
957    ) -> Result<PipelineData, ShellError> {
958        let call = EngineCall::CallDecl {
959            decl_id,
960            call,
961            input,
962            redirect_stdout,
963            redirect_stderr,
964        };
965
966        match self.engine_call(call)? {
967            EngineCallResponse::Error(err) => Err(err),
968            EngineCallResponse::PipelineData(data) => Ok(data),
969            _ => Err(ShellError::PluginFailedToDecode {
970                msg: "Received unexpected response type for EngineCall::CallDecl".into(),
971            }),
972        }
973    }
974
975    /// Tell the engine whether to disable garbage collection for this plugin.
976    ///
977    /// The garbage collector is enabled by default, but plugins can turn it off (ideally
978    /// temporarily) as necessary to implement functionality that requires the plugin to stay
979    /// running for longer than the engine can automatically determine.
980    ///
981    /// The user can still stop the plugin if they want to with the `plugin stop` command.
982    pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
983        self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
984        self.flush()
985    }
986
987    /// Write a call response of [`Ordering`], for `partial_cmp`.
988    pub(crate) fn write_ordering(
989        &self,
990        ordering: Option<impl Into<Ordering>>,
991    ) -> Result<(), ShellError> {
992        let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
993        self.write(PluginOutput::CallResponse(self.context()?, response))?;
994        self.flush()
995    }
996
997    pub fn signals(&self) -> &Signals {
998        &self.state.signals
999    }
1000}
1001
1002impl Interface for EngineInterface {
1003    type Output = PluginOutput;
1004    type DataContext = ();
1005
1006    fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1007        log::trace!("to engine: {output:?}");
1008        self.state.writer.write(&output)
1009    }
1010
1011    fn flush(&self) -> Result<(), ShellError> {
1012        self.state.writer.flush()
1013    }
1014
1015    fn stream_id_sequence(&self) -> &Sequence {
1016        &self.state.stream_id_sequence
1017    }
1018
1019    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1020        &self.stream_manager_handle
1021    }
1022
1023    fn prepare_pipeline_data(
1024        &self,
1025        mut data: PipelineData,
1026        _context: &(),
1027    ) -> Result<PipelineData, ShellError> {
1028        // Serialize custom values in the pipeline data
1029        match data {
1030            PipelineData::Value(ref mut value, _) => {
1031                PluginCustomValue::serialize_custom_values_in(value)?;
1032                Ok(data)
1033            }
1034            PipelineData::ListStream(stream, meta) => {
1035                let stream = stream.map(|mut value| {
1036                    let span = value.span();
1037                    PluginCustomValue::serialize_custom_values_in(&mut value)
1038                        .map(|_| value)
1039                        .unwrap_or_else(|err| Value::error(err, span))
1040                });
1041                Ok(PipelineData::list_stream(stream, meta))
1042            }
1043            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1044        }
1045    }
1046}
1047
1048/// Keeps the plugin in the foreground as long as it is alive.
1049///
1050/// Use [`.leave()`](Self::leave) to leave the foreground without ignoring the error.
1051pub struct ForegroundGuard(Option<EngineInterface>);
1052
1053impl ForegroundGuard {
1054    // Should be called only once
1055    fn leave_internal(&mut self) -> Result<(), ShellError> {
1056        if let Some(interface) = self.0.take() {
1057            // On Unix, we need to put ourselves back in our own process group
1058            #[cfg(unix)]
1059            {
1060                use nix::unistd::{Pid, setpgid};
1061                // This should always succeed, frankly, but handle the error just in case
1062                setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1063                    nu_protocol::shell_error::io::IoError::new_internal(
1064                        std::io::Error::from(err),
1065                        "Could not set pgid",
1066                        nu_protocol::location!(),
1067                    )
1068                })?;
1069            }
1070            interface.leave_foreground()?;
1071        }
1072        Ok(())
1073    }
1074
1075    /// Leave the foreground. In contrast to dropping the guard, this preserves the error (if any).
1076    pub fn leave(mut self) -> Result<(), ShellError> {
1077        let result = self.leave_internal();
1078        std::mem::forget(self);
1079        result
1080    }
1081}
1082
1083impl Drop for ForegroundGuard {
1084    fn drop(&mut self) {
1085        let _ = self.leave_internal();
1086    }
1087}
1088
1089#[cfg(unix)]
1090fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1091    use nix::unistd::{Pid, setpgid};
1092    if let Ok(pgrp) = pgrp.try_into() {
1093        setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1094            error: "Failed to set process group for foreground".into(),
1095            msg: "".into(),
1096            span: None,
1097            help: Some(err.to_string()),
1098            inner: vec![],
1099        })
1100    } else {
1101        Err(ShellError::NushellFailed {
1102            msg: "Engine returned an invalid process group ID".into(),
1103        })
1104    }
1105}
1106
1107#[cfg(not(unix))]
1108fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1109    Err(ShellError::NushellFailed {
1110        msg: concat!(
1111            "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.",
1112        )
1113        .into(),
1114    })
1115}