nu_plugin/plugin/interface/
mod.rs

1//! Interface used by the plugin to communicate with the engine.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10    GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11    PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14    Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, LabeledError, PipelineData,
15    PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16    engine::{Closure, Sequence},
17};
18use nu_utils::SharedCow;
19use std::{
20    collections::{BTreeMap, HashMap, btree_map},
21    sync::{Arc, atomic::AtomicBool, mpsc},
22};
23
24/// Plugin calls that are received by the [`EngineInterfaceManager`] for handling.
25///
26/// With each call, an [`EngineInterface`] is included that can be provided to the plugin code
27/// and should be used to send the response. The interface sent includes the [`PluginCallId`] for
28/// sending associated messages with the correct context.
29///
30/// This is not a public API.
31#[derive(Debug)]
32#[doc(hidden)]
33pub enum ReceivedPluginCall {
34    Metadata {
35        engine: EngineInterface,
36    },
37    Signature {
38        engine: EngineInterface,
39    },
40    Run {
41        engine: EngineInterface,
42        call: CallInfo<PipelineData>,
43    },
44    GetCompletion {
45        engine: EngineInterface,
46        info: GetCompletionInfo,
47    },
48    CustomValueOp {
49        engine: EngineInterface,
50        custom_value: Spanned<PluginCustomValue>,
51        op: CustomValueOp,
52    },
53}
54
55#[cfg(test)]
56mod tests;
57
58/// Internal shared state between the manager and each interface.
59struct EngineInterfaceState {
60    /// Protocol version info, set after `Hello` received
61    protocol_info: Waitable<Arc<ProtocolInfo>>,
62    /// Sequence for generating engine call ids
63    engine_call_id_sequence: Sequence,
64    /// Sequence for generating stream ids
65    stream_id_sequence: Sequence,
66    /// Sender to subscribe to an engine call response
67    engine_call_subscription_sender:
68        mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
69    /// The synchronized output writer
70    writer: Box<dyn PluginWrite<PluginOutput>>,
71    /// Mirror signals from `EngineState`. You can make use of this with
72    /// `engine_interface.signals()` when constructing a Stream that requires signals
73    signals: Signals,
74    /// Registered signal handlers
75    signal_handlers: Handlers,
76}
77
78impl std::fmt::Debug for EngineInterfaceState {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("EngineInterfaceState")
81            .field("protocol_info", &self.protocol_info)
82            .field("engine_call_id_sequence", &self.engine_call_id_sequence)
83            .field("stream_id_sequence", &self.stream_id_sequence)
84            .field(
85                "engine_call_subscription_sender",
86                &self.engine_call_subscription_sender,
87            )
88            .finish_non_exhaustive()
89    }
90}
91
92/// Manages reading and dispatching messages for [`EngineInterface`]s.
93///
94/// This is not a public API.
95#[derive(Debug)]
96#[doc(hidden)]
97pub struct EngineInterfaceManager {
98    /// Shared state
99    state: Arc<EngineInterfaceState>,
100    /// The writer for protocol info
101    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
102    /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
103    plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
104    /// Receiver for PluginCalls. This is usually taken after initialization
105    plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
106    /// Subscriptions for engine call responses
107    engine_call_subscriptions:
108        BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
109    /// Receiver for engine call subscriptions
110    engine_call_subscription_receiver:
111        mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
112    /// Manages stream messages and state
113    stream_manager: StreamManager,
114}
115
116impl EngineInterfaceManager {
117    pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
118        let (plug_tx, plug_rx) = mpsc::channel();
119        let (subscription_tx, subscription_rx) = mpsc::channel();
120        let protocol_info_mut = WaitableMut::new();
121
122        EngineInterfaceManager {
123            state: Arc::new(EngineInterfaceState {
124                protocol_info: protocol_info_mut.reader(),
125                engine_call_id_sequence: Sequence::default(),
126                stream_id_sequence: Sequence::default(),
127                engine_call_subscription_sender: subscription_tx,
128                writer: Box::new(writer),
129                signals: Signals::new(Arc::new(AtomicBool::new(false))),
130                signal_handlers: Handlers::new(),
131            }),
132            protocol_info_mut,
133            plugin_call_sender: Some(plug_tx),
134            plugin_call_receiver: Some(plug_rx),
135            engine_call_subscriptions: BTreeMap::new(),
136            engine_call_subscription_receiver: subscription_rx,
137            stream_manager: StreamManager::new(),
138        }
139    }
140
141    /// Get the receiving end of the plugin call channel. Plugin calls that need to be handled
142    /// will be sent here.
143    pub(crate) fn take_plugin_call_receiver(
144        &mut self,
145    ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
146        self.plugin_call_receiver.take()
147    }
148
149    /// Create an [`EngineInterface`] associated with the given call id.
150    fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
151        EngineInterface {
152            state: self.state.clone(),
153            stream_manager_handle: self.stream_manager.get_handle(),
154            context: Some(context),
155        }
156    }
157
158    /// Send a [`ReceivedPluginCall`] to the channel
159    fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
160        self.plugin_call_sender
161            .as_ref()
162            .ok_or_else(|| ShellError::PluginFailedToDecode {
163                msg: "Received a plugin call after Goodbye".into(),
164            })?
165            .send(plugin_call)
166            .map_err(|_| ShellError::NushellFailed {
167                msg: "Received a plugin call, but there's nowhere to send it".into(),
168            })
169    }
170
171    /// Flush any remaining subscriptions in the receiver into the map
172    fn receive_engine_call_subscriptions(&mut self) {
173        for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
174            if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
175                e.insert(subscription);
176            } else {
177                log::warn!("Duplicate engine call ID ignored: {id}")
178            }
179        }
180    }
181
182    /// Send a [`EngineCallResponse`] to the appropriate sender
183    fn send_engine_call_response(
184        &mut self,
185        id: EngineCallId,
186        response: EngineCallResponse<PipelineData>,
187    ) -> Result<(), ShellError> {
188        // Ensure all of the subscriptions have been flushed out of the receiver
189        self.receive_engine_call_subscriptions();
190        // Remove the sender - there is only one response per engine call
191        if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
192            if sender.send(response).is_err() {
193                log::warn!("Received an engine call response for id={id}, but the caller hung up");
194            }
195            Ok(())
196        } else {
197            Err(ShellError::PluginFailedToDecode {
198                msg: format!("Unknown engine call ID: {id}"),
199            })
200        }
201    }
202
203    /// True if there are no other copies of the state (which would mean there are no interfaces
204    /// and no stream readers/writers)
205    pub(crate) fn is_finished(&self) -> bool {
206        Arc::strong_count(&self.state) < 2
207    }
208
209    /// Loop on input from the given reader as long as `is_finished()` is false
210    ///
211    /// Any errors will be propagated to all read streams automatically.
212    pub(crate) fn consume_all(
213        &mut self,
214        mut reader: impl PluginRead<PluginInput>,
215    ) -> Result<(), ShellError> {
216        while let Some(msg) = reader.read().transpose() {
217            if self.is_finished() {
218                break;
219            }
220
221            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
222                // Error to streams
223                let _ = self.stream_manager.broadcast_read_error(err.clone());
224                // Error to engine call waiters
225                self.receive_engine_call_subscriptions();
226                for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
227                    let _ = sender.send(EngineCallResponse::Error(err.clone()));
228                }
229                return Err(err);
230            }
231        }
232        Ok(())
233    }
234}
235
236impl InterfaceManager for EngineInterfaceManager {
237    type Interface = EngineInterface;
238    type Input = PluginInput;
239
240    fn get_interface(&self) -> Self::Interface {
241        EngineInterface {
242            state: self.state.clone(),
243            stream_manager_handle: self.stream_manager.get_handle(),
244            context: None,
245        }
246    }
247
248    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
249        log::trace!("from engine: {input:?}");
250        match input {
251            PluginInput::Hello(info) => {
252                let info = Arc::new(info);
253                self.protocol_info_mut.set(info.clone())?;
254
255                let local_info = ProtocolInfo::default();
256                if local_info.is_compatible_with(&info)? {
257                    Ok(())
258                } else {
259                    Err(ShellError::PluginFailedToLoad {
260                        msg: format!(
261                            "Plugin is compiled for nushell version {}, \
262                                which is not compatible with version {}",
263                            local_info.version, info.version
264                        ),
265                    })
266                }
267            }
268            _ if !self.state.protocol_info.is_set() => {
269                // Must send protocol info first
270                Err(ShellError::PluginFailedToLoad {
271                    msg: "Failed to receive initial Hello message. This engine might be too old"
272                        .into(),
273                })
274            }
275            // Stream messages
276            PluginInput::Data(..)
277            | PluginInput::End(..)
278            | PluginInput::Drop(..)
279            | PluginInput::Ack(..) => {
280                self.consume_stream_message(input.try_into().map_err(|msg| {
281                    ShellError::NushellFailed {
282                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
283                    }
284                })?)
285            }
286            PluginInput::Call(id, call) => {
287                let interface = self.interface_for_context(id);
288                // Read streams in the input
289                let call = match call
290                    .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
291                {
292                    Ok(call) => call,
293                    Err(err) => {
294                        // If there's an error with initialization of the input stream, just send
295                        // the error response rather than failing here
296                        return interface.write_response(Err(err))?.write();
297                    }
298                };
299                match call {
300                    // Ask the plugin for metadata
301                    PluginCall::Metadata => {
302                        self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
303                    }
304                    // Ask the plugin for signatures
305                    PluginCall::Signature => {
306                        self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
307                    }
308                    // Parse custom values and send a ReceivedPluginCall
309                    PluginCall::Run(mut call_info) => {
310                        // Deserialize custom values in the arguments
311                        if let Err(err) = deserialize_call_args(&mut call_info.call) {
312                            return interface.write_response(Err(err))?.write();
313                        }
314                        // Send the plugin call to the receiver
315                        self.send_plugin_call(ReceivedPluginCall::Run {
316                            engine: interface,
317                            call: call_info,
318                        })
319                    }
320                    // Send request with the custom value
321                    PluginCall::CustomValueOp(custom_value, op) => {
322                        self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
323                            engine: interface,
324                            custom_value,
325                            op,
326                        })
327                    }
328                    PluginCall::GetCompletion(info) => {
329                        self.send_plugin_call(ReceivedPluginCall::GetCompletion {
330                            engine: interface,
331                            info,
332                        })
333                    }
334                }
335            }
336            PluginInput::Goodbye => {
337                // Remove the plugin call sender so it hangs up
338                drop(self.plugin_call_sender.take());
339                Ok(())
340            }
341            PluginInput::EngineCallResponse(id, response) => {
342                let response = response
343                    .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
344                    .unwrap_or_else(|err| {
345                        // If there's an error with initializing this stream, change it to an engine
346                        // call error response, but send it anyway
347                        EngineCallResponse::Error(err)
348                    });
349                self.send_engine_call_response(id, response)
350            }
351            PluginInput::Signal(action) => {
352                match action {
353                    SignalAction::Interrupt => self.state.signals.trigger(),
354                    SignalAction::Reset => self.state.signals.reset(),
355                }
356                self.state.signal_handlers.run(action);
357                Ok(())
358            }
359        }
360    }
361
362    fn stream_manager(&self) -> &StreamManager {
363        &self.stream_manager
364    }
365
366    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
367        // Deserialize custom values in the pipeline data
368        match data {
369            PipelineData::Value(ref mut value, _) => {
370                PluginCustomValue::deserialize_custom_values_in(value)?;
371                Ok(data)
372            }
373            PipelineData::ListStream(stream, meta) => {
374                let stream = stream.map(|mut value| {
375                    let span = value.span();
376                    PluginCustomValue::deserialize_custom_values_in(&mut value)
377                        .map(|()| value)
378                        .unwrap_or_else(|err| Value::error(err, span))
379                });
380                Ok(PipelineData::list_stream(stream, meta))
381            }
382            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
383        }
384    }
385}
386
387/// Deserialize custom values in call arguments
388fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
389    call.positional
390        .iter_mut()
391        .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
392    call.named
393        .iter_mut()
394        .flat_map(|(_, value)| value.as_mut())
395        .try_for_each(PluginCustomValue::deserialize_custom_values_in)
396}
397
398/// A reference through which the nushell engine can be interacted with during execution.
399#[derive(Debug, Clone)]
400pub struct EngineInterface {
401    /// Shared state with the manager
402    state: Arc<EngineInterfaceState>,
403    /// Handle to stream manager
404    stream_manager_handle: StreamManagerHandle,
405    /// The plugin call this interface belongs to.
406    context: Option<PluginCallId>,
407}
408
409impl EngineInterface {
410    /// Write the protocol info. This should be done after initialization
411    pub(crate) fn hello(&self) -> Result<(), ShellError> {
412        self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
413        self.flush()
414    }
415
416    fn context(&self) -> Result<PluginCallId, ShellError> {
417        self.context.ok_or_else(|| ShellError::NushellFailed {
418            msg: "Tried to call an EngineInterface method that requires a call context \
419                outside of one"
420                .into(),
421        })
422    }
423
424    /// Write an OK call response or an error.
425    pub(crate) fn write_ok(
426        &self,
427        result: Result<(), impl Into<LabeledError>>,
428    ) -> Result<(), ShellError> {
429        let response = match result {
430            Ok(()) => PluginCallResponse::Ok,
431            Err(err) => PluginCallResponse::Error(err.into()),
432        };
433        self.write(PluginOutput::CallResponse(self.context()?, response))?;
434        self.flush()
435    }
436
437    /// Write a call response of either [`PipelineData`] or an error. Returns the stream writer
438    /// to finish writing the stream
439    pub(crate) fn write_response(
440        &self,
441        result: Result<PipelineData, impl Into<LabeledError>>,
442    ) -> Result<PipelineDataWriter<Self>, ShellError> {
443        match result {
444            Ok(data) => {
445                let (header, writer) = match self.init_write_pipeline_data(data, &()) {
446                    Ok(tup) => tup,
447                    // If we get an error while trying to construct the pipeline data, send that
448                    // instead
449                    Err(err) => return self.write_response(Err(err)),
450                };
451                // Write pipeline data header response, and the full stream
452                let response = PluginCallResponse::PipelineData(header);
453                self.write(PluginOutput::CallResponse(self.context()?, response))?;
454                self.flush()?;
455                Ok(writer)
456            }
457            Err(err) => {
458                let response = PluginCallResponse::Error(err.into());
459                self.write(PluginOutput::CallResponse(self.context()?, response))?;
460                self.flush()?;
461                Ok(Default::default())
462            }
463        }
464    }
465
466    /// Write a call response of plugin metadata.
467    pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
468        let response = PluginCallResponse::Metadata(metadata);
469        self.write(PluginOutput::CallResponse(self.context()?, response))?;
470        self.flush()
471    }
472
473    /// Write a call response of plugin signatures.
474    ///
475    /// Any custom values in the examples will be rendered using `to_base_value()`.
476    pub(crate) fn write_signature(
477        &self,
478        signature: Vec<PluginSignature>,
479    ) -> Result<(), ShellError> {
480        let response = PluginCallResponse::Signature(signature);
481        self.write(PluginOutput::CallResponse(self.context()?, response))?;
482        self.flush()
483    }
484
485    /// Write a call response of completion items.
486    pub(crate) fn write_completion_items(
487        &self,
488        items: Option<Vec<DynamicSuggestion>>,
489    ) -> Result<(), ShellError> {
490        let response = PluginCallResponse::CompletionItems(items);
491        self.write(PluginOutput::CallResponse(self.context()?, response))?;
492        self.flush()
493    }
494
495    /// Write an engine call message. Returns the writer for the stream, and the receiver for
496    /// the response to the engine call.
497    fn write_engine_call(
498        &self,
499        call: EngineCall<PipelineData>,
500    ) -> Result<
501        (
502            PipelineDataWriter<Self>,
503            mpsc::Receiver<EngineCallResponse<PipelineData>>,
504        ),
505        ShellError,
506    > {
507        let context = self.context()?;
508        let id = self.state.engine_call_id_sequence.next()?;
509        let (tx, rx) = mpsc::channel();
510
511        // Convert the call into one with a header and handle the stream, if necessary
512        let mut writer = None;
513
514        let call = call.map_data(|input| {
515            let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
516            writer = Some(input_writer);
517            Ok(input_header)
518        })?;
519
520        // Register the channel
521        self.state
522            .engine_call_subscription_sender
523            .send((id, tx))
524            .map_err(|_| ShellError::NushellFailed {
525                msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
526                    .into(),
527            })?;
528
529        // Write request
530        self.write(PluginOutput::EngineCall { context, id, call })?;
531        self.flush()?;
532
533        Ok((writer.unwrap_or_default(), rx))
534    }
535
536    /// Perform an engine call. Input and output streams are handled.
537    fn engine_call(
538        &self,
539        call: EngineCall<PipelineData>,
540    ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
541        let (writer, rx) = self.write_engine_call(call)?;
542
543        // Finish writing stream in the background
544        writer.write_background()?;
545
546        // Wait on receiver to get the response
547        rx.recv().map_err(|_| ShellError::NushellFailed {
548            msg: "Failed to get response to engine call because the channel was closed".into(),
549        })
550    }
551
552    /// Returns `true` if the plugin is communicating on stdio. When this is the case, stdin and
553    /// stdout should not be used by the plugin for other purposes.
554    ///
555    /// If the plugin can not be used without access to stdio, an error should be presented to the
556    /// user instead.
557    pub fn is_using_stdio(&self) -> bool {
558        self.state.writer.is_stdout()
559    }
560
561    /// Register a closure which will be called when the engine receives an interrupt signal.
562    /// Returns a RAII guard that will keep the closure alive until it is dropped.
563    pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
564        self.state.signal_handlers.register(handler)
565    }
566
567    /// Get the full shell configuration from the engine. As this is quite a large object, it is
568    /// provided on request only.
569    ///
570    /// # Example
571    ///
572    /// Format a value in the user's preferred way:
573    ///
574    /// ```rust,no_run
575    /// # use nu_protocol::{Value, ShellError};
576    /// # use nu_plugin::EngineInterface;
577    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
578    /// let config = engine.get_config()?;
579    /// eprintln!("{}", value.to_expanded_string(", ", &config));
580    /// # Ok(())
581    /// # }
582    /// ```
583    pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
584        match self.engine_call(EngineCall::GetConfig)? {
585            EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
586            EngineCallResponse::Error(err) => Err(err),
587            _ => Err(ShellError::PluginFailedToDecode {
588                msg: "Received unexpected response for EngineCall::GetConfig".into(),
589            }),
590        }
591    }
592
593    /// Do an engine call returning an `Option<Value>` as either `PipelineData::empty()` or
594    /// `PipelineData::value`
595    fn engine_call_option_value(
596        &self,
597        engine_call: EngineCall<PipelineData>,
598    ) -> Result<Option<Value>, ShellError> {
599        let name = engine_call.name();
600        match self.engine_call(engine_call)? {
601            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
602            EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
603            EngineCallResponse::Error(err) => Err(err),
604            _ => Err(ShellError::PluginFailedToDecode {
605                msg: format!("Received unexpected response for EngineCall::{name}"),
606            }),
607        }
608    }
609
610    /// Get the plugin-specific configuration from the engine. This lives in
611    /// `$env.config.plugins.NAME` for a plugin named `NAME`. If the config is set to a closure,
612    /// it is automatically evaluated each time.
613    ///
614    /// # Example
615    ///
616    /// Print this plugin's config:
617    ///
618    /// ```rust,no_run
619    /// # use nu_protocol::{Value, ShellError};
620    /// # use nu_plugin::EngineInterface;
621    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
622    /// let config = engine.get_plugin_config()?;
623    /// eprintln!("{:?}", config);
624    /// # Ok(())
625    /// # }
626    /// ```
627    pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
628        self.engine_call_option_value(EngineCall::GetPluginConfig)
629    }
630
631    /// Get an environment variable from the engine.
632    ///
633    /// Returns `Some(value)` if present, and `None` if not found.
634    ///
635    /// # Example
636    ///
637    /// Get `$env.PATH`:
638    ///
639    /// ```rust,no_run
640    /// # use nu_protocol::{Value, ShellError};
641    /// # use nu_plugin::EngineInterface;
642    /// # fn example(engine: &EngineInterface) -> Result<Option<Value>, ShellError> {
643    /// engine.get_env_var("PATH") // => Ok(Some(Value::List([...])))
644    /// # }
645    /// ```
646    pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
647        self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
648    }
649
650    /// Get the current working directory from the engine. The result is always an absolute path.
651    ///
652    /// # Example
653    /// ```rust,no_run
654    /// # use nu_protocol::{Value, ShellError};
655    /// # use nu_plugin::EngineInterface;
656    /// # fn example(engine: &EngineInterface) -> Result<String, ShellError> {
657    /// engine.get_current_dir() // => "/home/user"
658    /// # }
659    /// ```
660    pub fn get_current_dir(&self) -> Result<String, ShellError> {
661        match self.engine_call(EngineCall::GetCurrentDir)? {
662            // Always a string, and the span doesn't matter.
663            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
664                Ok(val)
665            }
666            EngineCallResponse::Error(err) => Err(err),
667            _ => Err(ShellError::PluginFailedToDecode {
668                msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
669            }),
670        }
671    }
672
673    /// Get all environment variables from the engine.
674    ///
675    /// Since this is quite a large map that has to be sent, prefer to use
676    /// [`.get_env_var()`] (Self::get_env_var) if the variables needed are known ahead of time
677    /// and there are only a small number needed.
678    ///
679    /// # Example
680    /// ```rust,no_run
681    /// # use nu_protocol::{Value, ShellError};
682    /// # use nu_plugin::EngineInterface;
683    /// # use std::collections::HashMap;
684    /// # fn example(engine: &EngineInterface) -> Result<HashMap<String, Value>, ShellError> {
685    /// engine.get_env_vars() // => Ok({"PATH": Value::List([...]), ...})
686    /// # }
687    /// ```
688    pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
689        match self.engine_call(EngineCall::GetEnvVars)? {
690            EngineCallResponse::ValueMap(map) => Ok(map),
691            EngineCallResponse::Error(err) => Err(err),
692            _ => Err(ShellError::PluginFailedToDecode {
693                msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
694            }),
695        }
696    }
697
698    /// Set an environment variable in the caller's scope.
699    ///
700    /// If called after the plugin response has already been sent (i.e. during a stream), this will
701    /// only affect the environment for engine calls related to this plugin call, and will not be
702    /// propagated to the environment of the caller.
703    ///
704    /// # Example
705    /// ```rust,no_run
706    /// # use nu_protocol::{Value, ShellError};
707    /// # use nu_plugin::EngineInterface;
708    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
709    /// engine.add_env_var("FOO", Value::test_string("bar"))
710    /// # }
711    /// ```
712    pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
713        match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
714            EngineCallResponse::PipelineData(_) => Ok(()),
715            EngineCallResponse::Error(err) => Err(err),
716            _ => Err(ShellError::PluginFailedToDecode {
717                msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
718            }),
719        }
720    }
721
722    /// Get the help string for the current command.
723    ///
724    /// This returns the same string as passing `--help` would, and can be used for the top-level
725    /// command in a command group that doesn't do anything on its own (e.g. `query`).
726    ///
727    /// # Example
728    /// ```rust,no_run
729    /// # use nu_protocol::{Value, ShellError};
730    /// # use nu_plugin::EngineInterface;
731    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
732    /// eprintln!("{}", engine.get_help()?);
733    /// # Ok(())
734    /// # }
735    /// ```
736    pub fn get_help(&self) -> Result<String, ShellError> {
737        match self.engine_call(EngineCall::GetHelp)? {
738            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
739                Ok(val)
740            }
741            _ => Err(ShellError::PluginFailedToDecode {
742                msg: "Received unexpected response type for EngineCall::GetHelp".into(),
743            }),
744        }
745    }
746
747    /// Returns a guard that will keep the plugin in the foreground as long as the guard is alive.
748    ///
749    /// Moving the plugin to the foreground is necessary for plugins that need to receive input and
750    /// signals directly from the terminal.
751    ///
752    /// The exact implementation is operating system-specific. On Unix, this ensures that the
753    /// plugin process becomes part of the process group controlling the terminal.
754    pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
755        match self.engine_call(EngineCall::EnterForeground)? {
756            EngineCallResponse::Error(error) => Err(error),
757            EngineCallResponse::PipelineData(PipelineData::Value(
758                Value::Int { val: pgrp, .. },
759                _,
760            )) => {
761                set_pgrp_from_enter_foreground(pgrp)?;
762                Ok(ForegroundGuard(Some(self.clone())))
763            }
764            EngineCallResponse::PipelineData(PipelineData::Empty) => {
765                Ok(ForegroundGuard(Some(self.clone())))
766            }
767            _ => Err(ShellError::PluginFailedToDecode {
768                msg: "Received unexpected response type for EngineCall::SetForeground".into(),
769            }),
770        }
771    }
772
773    /// Internal: for exiting the foreground after `enter_foreground()`. Called from the guard.
774    fn leave_foreground(&self) -> Result<(), ShellError> {
775        match self.engine_call(EngineCall::LeaveForeground)? {
776            EngineCallResponse::Error(error) => Err(error),
777            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
778            _ => Err(ShellError::PluginFailedToDecode {
779                msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
780            }),
781        }
782    }
783
784    /// Get the contents of a [`Span`] from the engine.
785    ///
786    /// This method returns `Vec<u8>` as it's possible for the matched span to not be a valid UTF-8
787    /// string, perhaps because it sliced through the middle of a UTF-8 byte sequence, as the
788    /// offsets are byte-indexed. Use [`String::from_utf8_lossy()`] for display if necessary.
789    pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
790        match self.engine_call(EngineCall::GetSpanContents(span))? {
791            EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
792                Ok(val)
793            }
794            _ => Err(ShellError::PluginFailedToDecode {
795                msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
796            }),
797        }
798    }
799
800    /// Ask the engine to evaluate a closure. Input to the closure is passed as a stream, and the
801    /// output is available as a stream.
802    ///
803    /// Set `redirect_stdout` to `true` to capture the standard output stream of an external
804    /// command, if the closure results in an external command.
805    ///
806    /// Set `redirect_stderr` to `true` to capture the standard error stream of an external command,
807    /// if the closure results in an external command.
808    ///
809    /// # Example
810    ///
811    /// Invoked as:
812    ///
813    /// ```nushell
814    /// my_command { seq 1 $in | each { |n| $"Hello, ($n)" } }
815    /// ```
816    ///
817    /// ```rust,no_run
818    /// # use nu_protocol::{Value, ShellError, PipelineData};
819    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
820    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
821    /// let closure = call.req(0)?;
822    /// let input = PipelineData::value(Value::int(4, call.head), None);
823    /// let output = engine.eval_closure_with_stream(
824    ///     &closure,
825    ///     vec![],
826    ///     input,
827    ///     true,
828    ///     false,
829    /// )?;
830    /// for value in output {
831    ///     eprintln!("Closure says: {}", value.as_str()?);
832    /// }
833    /// # Ok(())
834    /// # }
835    /// ```
836    ///
837    /// Output:
838    ///
839    /// ```text
840    /// Closure says: Hello, 1
841    /// Closure says: Hello, 2
842    /// Closure says: Hello, 3
843    /// Closure says: Hello, 4
844    /// ```
845    pub fn eval_closure_with_stream(
846        &self,
847        closure: &Spanned<Closure>,
848        mut positional: Vec<Value>,
849        input: PipelineData,
850        redirect_stdout: bool,
851        redirect_stderr: bool,
852    ) -> Result<PipelineData, ShellError> {
853        // Ensure closure args have custom values serialized
854        positional
855            .iter_mut()
856            .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
857
858        let call = EngineCall::EvalClosure {
859            closure: closure.clone(),
860            positional,
861            input,
862            redirect_stdout,
863            redirect_stderr,
864        };
865
866        match self.engine_call(call)? {
867            EngineCallResponse::Error(error) => Err(error),
868            EngineCallResponse::PipelineData(data) => Ok(data),
869            _ => Err(ShellError::PluginFailedToDecode {
870                msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
871            }),
872        }
873    }
874
875    /// Ask the engine to evaluate a closure. Input is optionally passed as a [`Value`], and output
876    /// of the closure is collected to a [`Value`] even if it is a stream.
877    ///
878    /// If the closure results in an external command, the return value will be a collected string
879    /// or binary value of the standard output stream of that command, similar to calling
880    /// [`eval_closure_with_stream()`](Self::eval_closure_with_stream) with `redirect_stdout` =
881    /// `true` and `redirect_stderr` = `false`.
882    ///
883    /// Use [`eval_closure_with_stream()`](Self::eval_closure_with_stream) if more control over the
884    /// input and output is desired.
885    ///
886    /// # Example
887    ///
888    /// Invoked as:
889    ///
890    /// ```nushell
891    /// my_command { |number| $number + 1}
892    /// ```
893    ///
894    /// ```rust,no_run
895    /// # use nu_protocol::{Value, ShellError};
896    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
897    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
898    /// let closure = call.req(0)?;
899    /// for n in 0..4 {
900    ///     let result = engine.eval_closure(&closure, vec![Value::int(n, call.head)], None)?;
901    ///     eprintln!("{} => {}", n, result.as_int()?);
902    /// }
903    /// # Ok(())
904    /// # }
905    /// ```
906    ///
907    /// Output:
908    ///
909    /// ```text
910    /// 0 => 1
911    /// 1 => 2
912    /// 2 => 3
913    /// 3 => 4
914    /// ```
915    pub fn eval_closure(
916        &self,
917        closure: &Spanned<Closure>,
918        positional: Vec<Value>,
919        input: Option<Value>,
920    ) -> Result<Value, ShellError> {
921        let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
922        let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
923        // Unwrap an error value
924        match output.into_value(closure.span)? {
925            Value::Error { error, .. } => Err(*error),
926            value => Ok(value),
927        }
928    }
929
930    /// Ask the engine for the identifier for a declaration. If found, the result can then be passed
931    /// to [`.call_decl()`](Self::call_decl) to call other internal commands.
932    ///
933    /// See [`.call_decl()`](Self::call_decl) for an example.
934    pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
935        let call = EngineCall::FindDecl(name.into());
936
937        match self.engine_call(call)? {
938            EngineCallResponse::Error(err) => Err(err),
939            EngineCallResponse::Identifier(id) => Ok(Some(id)),
940            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
941            _ => Err(ShellError::PluginFailedToDecode {
942                msg: "Received unexpected response type for EngineCall::FindDecl".into(),
943            }),
944        }
945    }
946
947    /// Ask the engine to call an internal command, using the declaration ID previously looked up
948    /// with [`.find_decl()`](Self::find_decl).
949    ///
950    /// # Example
951    ///
952    /// ```rust,no_run
953    /// # use nu_protocol::{Value, ShellError, PipelineData};
954    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
955    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<Value, ShellError> {
956    /// if let Some(decl_id) = engine.find_decl("scope commands")? {
957    ///     let commands = engine.call_decl(
958    ///         decl_id,
959    ///         EvaluatedCall::new(call.head),
960    ///         PipelineData::empty(),
961    ///         true,
962    ///         false,
963    ///     )?;
964    ///     commands.into_value(call.head)
965    /// } else {
966    ///     Ok(Value::list(vec![], call.head))
967    /// }
968    /// # }
969    /// ```
970    pub fn call_decl(
971        &self,
972        decl_id: DeclId,
973        call: EvaluatedCall,
974        input: PipelineData,
975        redirect_stdout: bool,
976        redirect_stderr: bool,
977    ) -> Result<PipelineData, ShellError> {
978        let call = EngineCall::CallDecl {
979            decl_id,
980            call,
981            input,
982            redirect_stdout,
983            redirect_stderr,
984        };
985
986        match self.engine_call(call)? {
987            EngineCallResponse::Error(err) => Err(err),
988            EngineCallResponse::PipelineData(data) => Ok(data),
989            _ => Err(ShellError::PluginFailedToDecode {
990                msg: "Received unexpected response type for EngineCall::CallDecl".into(),
991            }),
992        }
993    }
994
995    /// Tell the engine whether to disable garbage collection for this plugin.
996    ///
997    /// The garbage collector is enabled by default, but plugins can turn it off (ideally
998    /// temporarily) as necessary to implement functionality that requires the plugin to stay
999    /// running for longer than the engine can automatically determine.
1000    ///
1001    /// The user can still stop the plugin if they want to with the `plugin stop` command.
1002    pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1003        self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1004        self.flush()
1005    }
1006
1007    /// Write a call response of [`Ordering`], for `partial_cmp`.
1008    pub(crate) fn write_ordering(
1009        &self,
1010        ordering: Option<impl Into<Ordering>>,
1011    ) -> Result<(), ShellError> {
1012        let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1013        self.write(PluginOutput::CallResponse(self.context()?, response))?;
1014        self.flush()
1015    }
1016
1017    pub fn signals(&self) -> &Signals {
1018        &self.state.signals
1019    }
1020}
1021
1022impl Interface for EngineInterface {
1023    type Output = PluginOutput;
1024    type DataContext = ();
1025
1026    fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1027        log::trace!("to engine: {output:?}");
1028        self.state.writer.write(&output)
1029    }
1030
1031    fn flush(&self) -> Result<(), ShellError> {
1032        self.state.writer.flush()
1033    }
1034
1035    fn stream_id_sequence(&self) -> &Sequence {
1036        &self.state.stream_id_sequence
1037    }
1038
1039    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1040        &self.stream_manager_handle
1041    }
1042
1043    fn prepare_pipeline_data(
1044        &self,
1045        mut data: PipelineData,
1046        _context: &(),
1047    ) -> Result<PipelineData, ShellError> {
1048        // Serialize custom values in the pipeline data
1049        match data {
1050            PipelineData::Value(ref mut value, _) => {
1051                PluginCustomValue::serialize_custom_values_in(value)?;
1052                Ok(data)
1053            }
1054            PipelineData::ListStream(stream, meta) => {
1055                let stream = stream.map(|mut value| {
1056                    let span = value.span();
1057                    PluginCustomValue::serialize_custom_values_in(&mut value)
1058                        .map(|_| value)
1059                        .unwrap_or_else(|err| Value::error(err, span))
1060                });
1061                Ok(PipelineData::list_stream(stream, meta))
1062            }
1063            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1064        }
1065    }
1066}
1067
1068/// Keeps the plugin in the foreground as long as it is alive.
1069///
1070/// Use [`.leave()`](Self::leave) to leave the foreground without ignoring the error.
1071pub struct ForegroundGuard(Option<EngineInterface>);
1072
1073impl ForegroundGuard {
1074    // Should be called only once
1075    fn leave_internal(&mut self) -> Result<(), ShellError> {
1076        if let Some(interface) = self.0.take() {
1077            // On Unix, we need to put ourselves back in our own process group
1078            #[cfg(unix)]
1079            {
1080                use nix::unistd::{Pid, setpgid};
1081                // This should always succeed, frankly, but handle the error just in case
1082                setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1083                    nu_protocol::shell_error::io::IoError::new_internal(
1084                        std::io::Error::from(err),
1085                        "Could not set pgid",
1086                        nu_protocol::location!(),
1087                    )
1088                })?;
1089            }
1090            interface.leave_foreground()?;
1091        }
1092        Ok(())
1093    }
1094
1095    /// Leave the foreground. In contrast to dropping the guard, this preserves the error (if any).
1096    pub fn leave(mut self) -> Result<(), ShellError> {
1097        let result = self.leave_internal();
1098        std::mem::forget(self);
1099        result
1100    }
1101}
1102
1103impl Drop for ForegroundGuard {
1104    fn drop(&mut self) {
1105        let _ = self.leave_internal();
1106    }
1107}
1108
1109#[cfg(unix)]
1110fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1111    use nix::unistd::{Pid, setpgid};
1112    if let Ok(pgrp) = pgrp.try_into() {
1113        setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1114            error: "Failed to set process group for foreground".into(),
1115            msg: "".into(),
1116            span: None,
1117            help: Some(err.to_string()),
1118            inner: vec![],
1119        })
1120    } else {
1121        Err(ShellError::NushellFailed {
1122            msg: "Engine returned an invalid process group ID".into(),
1123        })
1124    }
1125}
1126
1127#[cfg(not(unix))]
1128fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1129    Err(ShellError::NushellFailed {
1130        msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1131    })
1132}