Skip to main content

nu_plugin/plugin/interface/
mod.rs

1//! Interface used by the plugin to communicate with the engine.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10    GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11    PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13#[cfg(unix)]
14use nu_protocol::shell_error::generic::GenericError;
15use nu_protocol::{
16    BlockId, Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, PipelineData,
17    PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
18    engine::{Closure, Sequence},
19    ir::IrBlock,
20};
21use nu_utils::SharedCow;
22use std::{
23    collections::{BTreeMap, HashMap, btree_map},
24    sync::{Arc, atomic::AtomicBool, mpsc},
25};
26
27/// Plugin calls that are received by the [`EngineInterfaceManager`] for handling.
28///
29/// With each call, an [`EngineInterface`] is included that can be provided to the plugin code
30/// and should be used to send the response. The interface sent includes the [`PluginCallId`] for
31/// sending associated messages with the correct context.
32///
33/// This is not a public API.
34#[derive(Debug)]
35#[doc(hidden)]
36pub enum ReceivedPluginCall {
37    Metadata {
38        engine: EngineInterface,
39    },
40    Signature {
41        engine: EngineInterface,
42    },
43    Run {
44        engine: EngineInterface,
45        call: CallInfo<PipelineData>,
46    },
47    GetCompletion {
48        engine: EngineInterface,
49        info: GetCompletionInfo,
50    },
51    CustomValueOp {
52        engine: EngineInterface,
53        custom_value: Spanned<PluginCustomValue>,
54        op: CustomValueOp,
55    },
56}
57
58#[cfg(test)]
59mod tests;
60
61/// Internal shared state between the manager and each interface.
62struct EngineInterfaceState {
63    /// Protocol version info, set after `Hello` received
64    protocol_info: Waitable<Arc<ProtocolInfo>>,
65    /// Sequence for generating engine call ids
66    engine_call_id_sequence: Sequence,
67    /// Sequence for generating stream ids
68    stream_id_sequence: Sequence,
69    /// Sender to subscribe to an engine call response
70    engine_call_subscription_sender:
71        mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
72    /// The synchronized output writer
73    writer: Box<dyn PluginWrite<PluginOutput>>,
74    /// Mirror signals from `EngineState`. You can make use of this with
75    /// `engine_interface.signals()` when constructing a Stream that requires signals
76    signals: Signals,
77    /// Registered signal handlers
78    signal_handlers: Handlers,
79}
80
81impl std::fmt::Debug for EngineInterfaceState {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("EngineInterfaceState")
84            .field("protocol_info", &self.protocol_info)
85            .field("engine_call_id_sequence", &self.engine_call_id_sequence)
86            .field("stream_id_sequence", &self.stream_id_sequence)
87            .field(
88                "engine_call_subscription_sender",
89                &self.engine_call_subscription_sender,
90            )
91            .finish_non_exhaustive()
92    }
93}
94
95/// Manages reading and dispatching messages for [`EngineInterface`]s.
96///
97/// This is not a public API.
98#[derive(Debug)]
99#[doc(hidden)]
100pub struct EngineInterfaceManager {
101    /// Shared state
102    state: Arc<EngineInterfaceState>,
103    /// The writer for protocol info
104    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
105    /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
106    plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
107    /// Receiver for PluginCalls. This is usually taken after initialization
108    plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
109    /// Subscriptions for engine call responses
110    engine_call_subscriptions:
111        BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
112    /// Receiver for engine call subscriptions
113    engine_call_subscription_receiver:
114        mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
115    /// Manages stream messages and state
116    stream_manager: StreamManager,
117}
118
119impl EngineInterfaceManager {
120    pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
121        let (plug_tx, plug_rx) = mpsc::channel();
122        let (subscription_tx, subscription_rx) = mpsc::channel();
123        let protocol_info_mut = WaitableMut::new();
124
125        EngineInterfaceManager {
126            state: Arc::new(EngineInterfaceState {
127                protocol_info: protocol_info_mut.reader(),
128                engine_call_id_sequence: Sequence::default(),
129                stream_id_sequence: Sequence::default(),
130                engine_call_subscription_sender: subscription_tx,
131                writer: Box::new(writer),
132                signals: Signals::new(Arc::new(AtomicBool::new(false))),
133                signal_handlers: Handlers::new(),
134            }),
135            protocol_info_mut,
136            plugin_call_sender: Some(plug_tx),
137            plugin_call_receiver: Some(plug_rx),
138            engine_call_subscriptions: BTreeMap::new(),
139            engine_call_subscription_receiver: subscription_rx,
140            stream_manager: StreamManager::new(),
141        }
142    }
143
144    /// Get the receiving end of the plugin call channel. Plugin calls that need to be handled
145    /// will be sent here.
146    pub(crate) fn take_plugin_call_receiver(
147        &mut self,
148    ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
149        self.plugin_call_receiver.take()
150    }
151
152    /// Create an [`EngineInterface`] associated with the given call id.
153    fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
154        EngineInterface {
155            state: self.state.clone(),
156            stream_manager_handle: self.stream_manager.get_handle(),
157            context: Some(context),
158        }
159    }
160
161    /// Send a [`ReceivedPluginCall`] to the channel
162    fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
163        self.plugin_call_sender
164            .as_ref()
165            .ok_or_else(|| ShellError::PluginFailedToDecode {
166                msg: "Received a plugin call after Goodbye".into(),
167            })?
168            .send(plugin_call)
169            .map_err(|_| ShellError::NushellFailed {
170                msg: "Received a plugin call, but there's nowhere to send it".into(),
171            })
172    }
173
174    /// Flush any remaining subscriptions in the receiver into the map
175    fn receive_engine_call_subscriptions(&mut self) {
176        for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
177            if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
178                e.insert(subscription);
179            } else {
180                log::warn!("Duplicate engine call ID ignored: {id}")
181            }
182        }
183    }
184
185    /// Send a [`EngineCallResponse`] to the appropriate sender
186    fn send_engine_call_response(
187        &mut self,
188        id: EngineCallId,
189        response: EngineCallResponse<PipelineData>,
190    ) -> Result<(), ShellError> {
191        // Ensure all of the subscriptions have been flushed out of the receiver
192        self.receive_engine_call_subscriptions();
193        // Remove the sender - there is only one response per engine call
194        if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
195            if sender.send(response).is_err() {
196                log::warn!("Received an engine call response for id={id}, but the caller hung up");
197            }
198            Ok(())
199        } else {
200            Err(ShellError::PluginFailedToDecode {
201                msg: format!("Unknown engine call ID: {id}"),
202            })
203        }
204    }
205
206    /// True if there are no other copies of the state (which would mean there are no interfaces
207    /// and no stream readers/writers)
208    pub(crate) fn is_finished(&self) -> bool {
209        Arc::strong_count(&self.state) < 2
210    }
211
212    /// Loop on input from the given reader as long as `is_finished()` is false
213    ///
214    /// Any errors will be propagated to all read streams automatically.
215    pub(crate) fn consume_all(
216        &mut self,
217        mut reader: impl PluginRead<PluginInput>,
218    ) -> Result<(), ShellError> {
219        while let Some(msg) = reader.read().transpose() {
220            if self.is_finished() {
221                break;
222            }
223
224            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
225                // Error to streams
226                let _ = self.stream_manager.broadcast_read_error(err.clone());
227                // Error to engine call waiters
228                self.receive_engine_call_subscriptions();
229                for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
230                    let _ = sender.send(EngineCallResponse::Error(err.clone()));
231                }
232                return Err(err);
233            }
234        }
235        Ok(())
236    }
237}
238
239impl InterfaceManager for EngineInterfaceManager {
240    type Interface = EngineInterface;
241    type Input = PluginInput;
242
243    fn get_interface(&self) -> Self::Interface {
244        EngineInterface {
245            state: self.state.clone(),
246            stream_manager_handle: self.stream_manager.get_handle(),
247            context: None,
248        }
249    }
250
251    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
252        log::trace!("from engine: {input:?}");
253        match input {
254            PluginInput::Hello(info) => {
255                let info = Arc::new(info);
256                self.protocol_info_mut.set(info.clone())?;
257
258                let local_info = ProtocolInfo::default();
259                if local_info.is_compatible_with(&info)? {
260                    Ok(())
261                } else {
262                    Err(ShellError::PluginFailedToLoad {
263                        msg: format!(
264                            "Plugin is compiled for nushell version {}, \
265                                which is not compatible with version {}",
266                            local_info.version, info.version
267                        ),
268                    })
269                }
270            }
271            _ if !self.state.protocol_info.is_set() => {
272                // Must send protocol info first
273                Err(ShellError::PluginFailedToLoad {
274                    msg: "Failed to receive initial Hello message. This engine might be too old"
275                        .into(),
276                })
277            }
278            // Stream messages
279            PluginInput::Data(..)
280            | PluginInput::End(..)
281            | PluginInput::Drop(..)
282            | PluginInput::Ack(..) => {
283                self.consume_stream_message(input.try_into().map_err(|msg| {
284                    ShellError::NushellFailed {
285                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
286                    }
287                })?)
288            }
289            PluginInput::Call(id, call) => {
290                let interface = self.interface_for_context(id);
291                // Read streams in the input
292                let call = match call
293                    .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
294                {
295                    Ok(call) => call,
296                    Err(err) => {
297                        // If there's an error with initialization of the input stream, just send
298                        // the error response rather than failing here
299                        return interface.write_response(Err(err))?.write();
300                    }
301                };
302                match call {
303                    // Ask the plugin for metadata
304                    PluginCall::Metadata => {
305                        self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
306                    }
307                    // Ask the plugin for signatures
308                    PluginCall::Signature => {
309                        self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
310                    }
311                    // Parse custom values and send a ReceivedPluginCall
312                    PluginCall::Run(mut call_info) => {
313                        // Deserialize custom values in the arguments
314                        if let Err(err) = deserialize_call_args(&mut call_info.call) {
315                            return interface.write_response(Err(err))?.write();
316                        }
317                        // Send the plugin call to the receiver
318                        self.send_plugin_call(ReceivedPluginCall::Run {
319                            engine: interface,
320                            call: call_info,
321                        })
322                    }
323                    // Send request with the custom value
324                    PluginCall::CustomValueOp(custom_value, op) => {
325                        self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
326                            engine: interface,
327                            custom_value,
328                            op,
329                        })
330                    }
331                    PluginCall::GetCompletion(info) => {
332                        self.send_plugin_call(ReceivedPluginCall::GetCompletion {
333                            engine: interface,
334                            info,
335                        })
336                    }
337                }
338            }
339            PluginInput::Goodbye => {
340                // Remove the plugin call sender so it hangs up
341                drop(self.plugin_call_sender.take());
342                Ok(())
343            }
344            PluginInput::EngineCallResponse(id, response) => {
345                let response = response
346                    .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
347                    .unwrap_or_else(|err| {
348                        // If there's an error with initializing this stream, change it to an engine
349                        // call error response, but send it anyway
350                        EngineCallResponse::Error(err)
351                    });
352                self.send_engine_call_response(id, response)
353            }
354            PluginInput::Signal(action) => {
355                match action {
356                    SignalAction::Interrupt => self.state.signals.trigger(),
357                    SignalAction::Reset => self.state.signals.reset(),
358                }
359                self.state.signal_handlers.run(action);
360                Ok(())
361            }
362        }
363    }
364
365    fn stream_manager(&self) -> &StreamManager {
366        &self.stream_manager
367    }
368
369    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
370        // Deserialize custom values in the pipeline data
371        match data {
372            PipelineData::Value(ref mut value, _) => {
373                PluginCustomValue::deserialize_custom_values_in(value)?;
374                Ok(data)
375            }
376            PipelineData::ListStream(stream, meta) => {
377                let stream = stream.map(|mut value| {
378                    let span = value.span();
379                    PluginCustomValue::deserialize_custom_values_in(&mut value)
380                        .map(|()| value)
381                        .unwrap_or_else(|err| Value::error(err, span))
382                });
383                Ok(PipelineData::list_stream(stream, meta))
384            }
385            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
386        }
387    }
388}
389
390/// Deserialize custom values in call arguments
391fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
392    call.positional
393        .iter_mut()
394        .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
395    call.named
396        .iter_mut()
397        .flat_map(|(_, value)| value.as_mut())
398        .try_for_each(PluginCustomValue::deserialize_custom_values_in)
399}
400
401/// A reference through which the nushell engine can be interacted with during execution.
402#[derive(Debug, Clone)]
403pub struct EngineInterface {
404    /// Shared state with the manager
405    state: Arc<EngineInterfaceState>,
406    /// Handle to stream manager
407    stream_manager_handle: StreamManagerHandle,
408    /// The plugin call this interface belongs to.
409    context: Option<PluginCallId>,
410}
411
412impl EngineInterface {
413    /// Write the protocol info. This should be done after initialization
414    pub(crate) fn hello(&self) -> Result<(), ShellError> {
415        self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
416        self.flush()
417    }
418
419    fn context(&self) -> Result<PluginCallId, ShellError> {
420        self.context.ok_or_else(|| ShellError::NushellFailed {
421            msg: "Tried to call an EngineInterface method that requires a call context \
422                outside of one"
423                .into(),
424        })
425    }
426
427    /// Write an OK call response or an error.
428    pub(crate) fn write_ok(
429        &self,
430        result: Result<(), impl Into<ShellError>>,
431    ) -> Result<(), ShellError> {
432        let response = match result {
433            Ok(()) => PluginCallResponse::Ok,
434            Err(err) => PluginCallResponse::Error(err.into()),
435        };
436        self.write(PluginOutput::CallResponse(self.context()?, response))?;
437        self.flush()
438    }
439
440    /// Write a call response of either [`PipelineData`] or an error. Returns the stream writer
441    /// to finish writing the stream
442    pub(crate) fn write_response(
443        &self,
444        result: Result<PipelineData, impl Into<ShellError>>,
445    ) -> Result<PipelineDataWriter<Self>, ShellError> {
446        match result {
447            Ok(data) => {
448                let (header, writer) = match self.init_write_pipeline_data(data, &()) {
449                    Ok(tup) => tup,
450                    // If we get an error while trying to construct the pipeline data, send that
451                    // instead
452                    Err(err) => return self.write_response(Err(err)),
453                };
454                // Write pipeline data header response, and the full stream
455                let response = PluginCallResponse::PipelineData(header);
456                self.write(PluginOutput::CallResponse(self.context()?, response))?;
457                self.flush()?;
458                Ok(writer)
459            }
460            Err(err) => {
461                let response = PluginCallResponse::Error(err.into());
462                self.write(PluginOutput::CallResponse(self.context()?, response))?;
463                self.flush()?;
464                Ok(Default::default())
465            }
466        }
467    }
468
469    /// Write a call response of plugin metadata.
470    pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
471        let response = PluginCallResponse::Metadata(metadata);
472        self.write(PluginOutput::CallResponse(self.context()?, response))?;
473        self.flush()
474    }
475
476    /// Write a call response of plugin signatures.
477    ///
478    /// Any custom values in the examples will be rendered using `to_base_value()`.
479    pub(crate) fn write_signature(
480        &self,
481        signature: Vec<PluginSignature>,
482    ) -> Result<(), ShellError> {
483        let response = PluginCallResponse::Signature(signature);
484        self.write(PluginOutput::CallResponse(self.context()?, response))?;
485        self.flush()
486    }
487
488    /// Write a call response of completion items.
489    pub(crate) fn write_completion_items(
490        &self,
491        items: Option<Vec<DynamicSuggestion>>,
492    ) -> Result<(), ShellError> {
493        let response = PluginCallResponse::CompletionItems(items);
494        self.write(PluginOutput::CallResponse(self.context()?, response))?;
495        self.flush()
496    }
497
498    /// Write an engine call message. Returns the writer for the stream, and the receiver for
499    /// the response to the engine call.
500    fn write_engine_call(
501        &self,
502        call: EngineCall<PipelineData>,
503    ) -> Result<
504        (
505            PipelineDataWriter<Self>,
506            mpsc::Receiver<EngineCallResponse<PipelineData>>,
507        ),
508        ShellError,
509    > {
510        let context = self.context()?;
511        let id = self.state.engine_call_id_sequence.next()?;
512        let (tx, rx) = mpsc::channel();
513
514        // Convert the call into one with a header and handle the stream, if necessary
515        let mut writer = None;
516
517        let call = call.map_data(|input| {
518            let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
519            writer = Some(input_writer);
520            Ok(input_header)
521        })?;
522
523        // Register the channel
524        self.state
525            .engine_call_subscription_sender
526            .send((id, tx))
527            .map_err(|_| ShellError::NushellFailed {
528                msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
529                    .into(),
530            })?;
531
532        // Write request
533        self.write(PluginOutput::EngineCall { context, id, call })?;
534        self.flush()?;
535
536        Ok((writer.unwrap_or_default(), rx))
537    }
538
539    /// Perform an engine call. Input and output streams are handled.
540    fn engine_call(
541        &self,
542        call: EngineCall<PipelineData>,
543    ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
544        let (writer, rx) = self.write_engine_call(call)?;
545
546        // Finish writing stream in the background
547        writer.write_background()?;
548
549        // Wait on receiver to get the response
550        rx.recv().map_err(|_| ShellError::NushellFailed {
551            msg: "Failed to get response to engine call because the channel was closed".into(),
552        })
553    }
554
555    /// Returns `true` if the plugin is communicating on stdio. When this is the case, stdin and
556    /// stdout should not be used by the plugin for other purposes.
557    ///
558    /// If the plugin can not be used without access to stdio, an error should be presented to the
559    /// user instead.
560    pub fn is_using_stdio(&self) -> bool {
561        self.state.writer.is_stdout()
562    }
563
564    /// Register a closure which will be called when the engine receives an interrupt signal.
565    /// Returns a RAII guard that will keep the closure alive until it is dropped.
566    pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
567        self.state.signal_handlers.register(handler)
568    }
569
570    /// Get the full shell configuration from the engine. As this is quite a large object, it is
571    /// provided on request only.
572    ///
573    /// # Example
574    ///
575    /// Format a value in the user's preferred way:
576    ///
577    /// ```rust,no_run
578    /// # use nu_protocol::{Value, ShellError};
579    /// # use nu_plugin::EngineInterface;
580    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
581    /// let config = engine.get_config()?;
582    /// eprintln!("{}", value.to_expanded_string(", ", &config));
583    /// # Ok(())
584    /// # }
585    /// ```
586    pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
587        match self.engine_call(EngineCall::GetConfig)? {
588            EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
589            EngineCallResponse::Error(err) => Err(err),
590            _ => Err(ShellError::PluginFailedToDecode {
591                msg: "Received unexpected response for EngineCall::GetConfig".into(),
592            }),
593        }
594    }
595
596    /// Do an engine call returning an `Option<Value>` as either `PipelineData::empty()` or
597    /// `PipelineData::value`
598    fn engine_call_option_value(
599        &self,
600        engine_call: EngineCall<PipelineData>,
601    ) -> Result<Option<Value>, ShellError> {
602        let name = engine_call.name();
603        match self.engine_call(engine_call)? {
604            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
605            EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
606            EngineCallResponse::Error(err) => Err(err),
607            _ => Err(ShellError::PluginFailedToDecode {
608                msg: format!("Received unexpected response for EngineCall::{name}"),
609            }),
610        }
611    }
612
613    /// Get the plugin-specific configuration from the engine. This lives in
614    /// `$env.config.plugins.NAME` for a plugin named `NAME`. If the config is set to a closure,
615    /// it is automatically evaluated each time.
616    ///
617    /// # Example
618    ///
619    /// Print this plugin's config:
620    ///
621    /// ```rust,no_run
622    /// # use nu_protocol::{Value, ShellError};
623    /// # use nu_plugin::EngineInterface;
624    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
625    /// let config = engine.get_plugin_config()?;
626    /// eprintln!("{:?}", config);
627    /// # Ok(())
628    /// # }
629    /// ```
630    pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
631        self.engine_call_option_value(EngineCall::GetPluginConfig)
632    }
633
634    /// Get an environment variable from the engine.
635    ///
636    /// Returns `Some(value)` if present, and `None` if not found.
637    ///
638    /// # Example
639    ///
640    /// Get `$env.PATH`:
641    ///
642    /// ```rust,no_run
643    /// # use nu_protocol::{Value, ShellError};
644    /// # use nu_plugin::EngineInterface;
645    /// # fn example(engine: &EngineInterface) -> Result<Option<Value>, ShellError> {
646    /// engine.get_env_var("PATH") // => Ok(Some(Value::List([...])))
647    /// # }
648    /// ```
649    pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
650        self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
651    }
652
653    /// Get the current working directory from the engine. The result is always an absolute path.
654    ///
655    /// # Example
656    /// ```rust,no_run
657    /// # use nu_protocol::{Value, ShellError};
658    /// # use nu_plugin::EngineInterface;
659    /// # fn example(engine: &EngineInterface) -> Result<String, ShellError> {
660    /// engine.get_current_dir() // => "/home/user"
661    /// # }
662    /// ```
663    pub fn get_current_dir(&self) -> Result<String, ShellError> {
664        match self.engine_call(EngineCall::GetCurrentDir)? {
665            // Always a string, and the span doesn't matter.
666            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
667                Ok(val)
668            }
669            EngineCallResponse::Error(err) => Err(err),
670            _ => Err(ShellError::PluginFailedToDecode {
671                msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
672            }),
673        }
674    }
675
676    /// Get all environment variables from the engine.
677    ///
678    /// Since this is quite a large map that has to be sent, prefer to use
679    /// [`.get_env_var()`](Self::get_env_var) if the variables needed are known ahead of time
680    /// and there are only a small number needed.
681    ///
682    /// # Example
683    /// ```rust,no_run
684    /// # use nu_protocol::{Value, ShellError};
685    /// # use nu_plugin::EngineInterface;
686    /// # use std::collections::HashMap;
687    /// # fn example(engine: &EngineInterface) -> Result<HashMap<String, Value>, ShellError> {
688    /// engine.get_env_vars() // => Ok({"PATH": Value::List([...]), ...})
689    /// # }
690    /// ```
691    pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
692        match self.engine_call(EngineCall::GetEnvVars)? {
693            EngineCallResponse::ValueMap(map) => Ok(map),
694            EngineCallResponse::Error(err) => Err(err),
695            _ => Err(ShellError::PluginFailedToDecode {
696                msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
697            }),
698        }
699    }
700
701    /// Set an environment variable in the caller's scope.
702    ///
703    /// If called after the plugin response has already been sent (i.e. during a stream), this will
704    /// only affect the environment for engine calls related to this plugin call, and will not be
705    /// propagated to the environment of the caller.
706    ///
707    /// # Example
708    /// ```rust,no_run
709    /// # use nu_protocol::{Value, ShellError};
710    /// # use nu_plugin::EngineInterface;
711    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
712    /// engine.add_env_var("FOO", Value::test_string("bar"))
713    /// # }
714    /// ```
715    pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
716        match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
717            EngineCallResponse::PipelineData(_) => Ok(()),
718            EngineCallResponse::Error(err) => Err(err),
719            _ => Err(ShellError::PluginFailedToDecode {
720                msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
721            }),
722        }
723    }
724
725    /// Get the help string for the current command.
726    ///
727    /// This returns the same string as passing `--help` would, and can be used for the top-level
728    /// command in a command group that doesn't do anything on its own (e.g. `query`).
729    ///
730    /// # Example
731    /// ```rust,no_run
732    /// # use nu_protocol::{Value, ShellError};
733    /// # use nu_plugin::EngineInterface;
734    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
735    /// eprintln!("{}", engine.get_help()?);
736    /// # Ok(())
737    /// # }
738    /// ```
739    pub fn get_help(&self) -> Result<String, ShellError> {
740        match self.engine_call(EngineCall::GetHelp)? {
741            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
742                Ok(val)
743            }
744            _ => Err(ShellError::PluginFailedToDecode {
745                msg: "Received unexpected response type for EngineCall::GetHelp".into(),
746            }),
747        }
748    }
749
750    /// Returns a guard that will keep the plugin in the foreground as long as the guard is alive.
751    ///
752    /// Moving the plugin to the foreground is necessary for plugins that need to receive input and
753    /// signals directly from the terminal.
754    ///
755    /// The exact implementation is operating system-specific. On Unix, this ensures that the
756    /// plugin process becomes part of the process group controlling the terminal.
757    pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
758        match self.engine_call(EngineCall::EnterForeground)? {
759            EngineCallResponse::Error(error) => Err(error),
760            EngineCallResponse::PipelineData(PipelineData::Value(
761                Value::Int { val: pgrp, .. },
762                _,
763            )) => {
764                set_pgrp_from_enter_foreground(pgrp)?;
765                Ok(ForegroundGuard(Some(self.clone())))
766            }
767            EngineCallResponse::PipelineData(PipelineData::Empty) => {
768                Ok(ForegroundGuard(Some(self.clone())))
769            }
770            _ => Err(ShellError::PluginFailedToDecode {
771                msg: "Received unexpected response type for EngineCall::SetForeground".into(),
772            }),
773        }
774    }
775
776    /// Internal: for exiting the foreground after `enter_foreground()`. Called from the guard.
777    fn leave_foreground(&self) -> Result<(), ShellError> {
778        match self.engine_call(EngineCall::LeaveForeground)? {
779            EngineCallResponse::Error(error) => Err(error),
780            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
781            _ => Err(ShellError::PluginFailedToDecode {
782                msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
783            }),
784        }
785    }
786
787    /// Get the contents of a [`Span`] from the engine.
788    ///
789    /// This method returns `Vec<u8>` as it's possible for the matched span to not be a valid UTF-8
790    /// string, perhaps because it sliced through the middle of a UTF-8 byte sequence, as the
791    /// offsets are byte-indexed. Use [`String::from_utf8_lossy()`] for display if necessary.
792    pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
793        match self.engine_call(EngineCall::GetSpanContents(span))? {
794            EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
795                Ok(val)
796            }
797            _ => Err(ShellError::PluginFailedToDecode {
798                msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
799            }),
800        }
801    }
802
803    /// Ask the engine to evaluate a closure. Input to the closure is passed as a stream, and the
804    /// output is available as a stream.
805    ///
806    /// Set `redirect_stdout` to `true` to capture the standard output stream of an external
807    /// command, if the closure results in an external command.
808    ///
809    /// Set `redirect_stderr` to `true` to capture the standard error stream of an external command,
810    /// if the closure results in an external command.
811    ///
812    /// # Example
813    ///
814    /// Invoked as:
815    ///
816    /// ```nushell
817    /// my_command { seq 1 $in | each { |n| $"Hello, ($n)" } }
818    /// ```
819    ///
820    /// ```rust,no_run
821    /// # use nu_protocol::{Value, ShellError, PipelineData};
822    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
823    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
824    /// let closure = call.req(0)?;
825    /// let input = PipelineData::value(Value::int(4, call.head), None);
826    /// let output = engine.eval_closure_with_stream(
827    ///     &closure,
828    ///     vec![],
829    ///     input,
830    ///     true,
831    ///     false,
832    /// )?;
833    /// for value in output {
834    ///     eprintln!("Closure says: {}", value.as_str()?);
835    /// }
836    /// # Ok(())
837    /// # }
838    /// ```
839    ///
840    /// Output:
841    ///
842    /// ```text
843    /// Closure says: Hello, 1
844    /// Closure says: Hello, 2
845    /// Closure says: Hello, 3
846    /// Closure says: Hello, 4
847    /// ```
848    pub fn eval_closure_with_stream(
849        &self,
850        closure: &Spanned<Closure>,
851        mut positional: Vec<Value>,
852        input: PipelineData,
853        redirect_stdout: bool,
854        redirect_stderr: bool,
855    ) -> Result<PipelineData, ShellError> {
856        // Ensure closure args have custom values serialized
857        positional
858            .iter_mut()
859            .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
860
861        let call = EngineCall::EvalClosure {
862            closure: closure.clone(),
863            positional,
864            input,
865            redirect_stdout,
866            redirect_stderr,
867        };
868
869        match self.engine_call(call)? {
870            EngineCallResponse::Error(error) => Err(error),
871            EngineCallResponse::PipelineData(data) => Ok(data),
872            _ => Err(ShellError::PluginFailedToDecode {
873                msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
874            }),
875        }
876    }
877
878    /// Ask the engine to evaluate a closure. Input is optionally passed as a [`Value`], and output
879    /// of the closure is collected to a [`Value`] even if it is a stream.
880    ///
881    /// If the closure results in an external command, the return value will be a collected string
882    /// or binary value of the standard output stream of that command, similar to calling
883    /// [`eval_closure_with_stream()`](Self::eval_closure_with_stream) with `redirect_stdout` =
884    /// `true` and `redirect_stderr` = `false`.
885    ///
886    /// Use [`eval_closure_with_stream()`](Self::eval_closure_with_stream) if more control over the
887    /// input and output is desired.
888    ///
889    /// # Example
890    ///
891    /// Invoked as:
892    ///
893    /// ```nushell
894    /// my_command { |number| $number + 1}
895    /// ```
896    ///
897    /// ```rust,no_run
898    /// # use nu_protocol::{Value, ShellError};
899    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
900    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
901    /// let closure = call.req(0)?;
902    /// for n in 0..4 {
903    ///     let result = engine.eval_closure(&closure, vec![Value::int(n, call.head)], None)?;
904    ///     eprintln!("{} => {}", n, result.as_int()?);
905    /// }
906    /// # Ok(())
907    /// # }
908    /// ```
909    ///
910    /// Output:
911    ///
912    /// ```text
913    /// 0 => 1
914    /// 1 => 2
915    /// 2 => 3
916    /// 3 => 4
917    /// ```
918    pub fn eval_closure(
919        &self,
920        closure: &Spanned<Closure>,
921        positional: Vec<Value>,
922        input: Option<Value>,
923    ) -> Result<Value, ShellError> {
924        let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
925        let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
926        // Unwrap an error value
927        match output.into_value(closure.span)? {
928            Value::Error { error, .. } => Err(*error),
929            value => Ok(value),
930        }
931    }
932
933    /// Ask the engine for the identifier for a declaration. If found, the result can then be passed
934    /// to [`.call_decl()`](Self::call_decl) to call other internal commands.
935    ///
936    /// See [`.call_decl()`](Self::call_decl) for an example.
937    pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
938        let call = EngineCall::FindDecl(name.into());
939
940        match self.engine_call(call)? {
941            EngineCallResponse::Error(err) => Err(err),
942            EngineCallResponse::Identifier(id) => Ok(Some(id)),
943            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
944            _ => Err(ShellError::PluginFailedToDecode {
945                msg: "Received unexpected response type for EngineCall::FindDecl".into(),
946            }),
947        }
948    }
949
950    /// Get the compiled IR for a block.
951    ///
952    /// # Example
953    ///
954    /// ```rust,no_run
955    /// # use nu_protocol::{ShellError, Spanned};
956    /// # use nu_protocol::engine::Closure;
957    /// # use nu_plugin::EngineInterface;
958    /// # fn example(engine: &EngineInterface, closure: &Spanned<Closure>) -> Result<(), ShellError> {
959    /// let ir_block = engine.get_block_ir(closure.item.block_id)?;
960    /// for instruction in &ir_block.instructions {
961    ///     // ...
962    /// }
963    /// # Ok(())
964    /// # }
965    /// ```
966    pub fn get_block_ir(&self, block_id: BlockId) -> Result<IrBlock, ShellError> {
967        let call = EngineCall::GetBlockIR(block_id);
968
969        match self.engine_call(call)? {
970            EngineCallResponse::Error(err) => Err(err),
971            EngineCallResponse::IrBlock(ir) => Ok(*ir),
972            _ => Err(ShellError::PluginFailedToDecode {
973                msg: "Received unexpected response type for EngineCall::GetBlockIR".into(),
974            }),
975        }
976    }
977
978    /// Ask the engine to call an internal command, using the declaration ID previously looked up
979    /// with [`.find_decl()`](Self::find_decl).
980    ///
981    /// # Example
982    ///
983    /// ```rust,no_run
984    /// # use nu_protocol::{Value, ShellError, PipelineData};
985    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
986    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<Value, ShellError> {
987    /// if let Some(decl_id) = engine.find_decl("scope commands")? {
988    ///     let commands = engine.call_decl(
989    ///         decl_id,
990    ///         EvaluatedCall::new(call.head),
991    ///         PipelineData::empty(),
992    ///         true,
993    ///         false,
994    ///     )?;
995    ///     commands.into_value(call.head)
996    /// } else {
997    ///     Ok(Value::list(vec![], call.head))
998    /// }
999    /// # }
1000    /// ```
1001    pub fn call_decl(
1002        &self,
1003        decl_id: DeclId,
1004        call: EvaluatedCall,
1005        input: PipelineData,
1006        redirect_stdout: bool,
1007        redirect_stderr: bool,
1008    ) -> Result<PipelineData, ShellError> {
1009        let call = EngineCall::CallDecl {
1010            decl_id,
1011            call,
1012            input,
1013            redirect_stdout,
1014            redirect_stderr,
1015        };
1016
1017        match self.engine_call(call)? {
1018            EngineCallResponse::Error(err) => Err(err),
1019            EngineCallResponse::PipelineData(data) => Ok(data),
1020            _ => Err(ShellError::PluginFailedToDecode {
1021                msg: "Received unexpected response type for EngineCall::CallDecl".into(),
1022            }),
1023        }
1024    }
1025
1026    /// Tell the engine whether to disable garbage collection for this plugin.
1027    ///
1028    /// The garbage collector is enabled by default, but plugins can turn it off (ideally
1029    /// temporarily) as necessary to implement functionality that requires the plugin to stay
1030    /// running for longer than the engine can automatically determine.
1031    ///
1032    /// The user can still stop the plugin if they want to with the `plugin stop` command.
1033    pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1034        self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1035        self.flush()
1036    }
1037
1038    /// Write a call response of [`Ordering`], for `partial_cmp`.
1039    pub(crate) fn write_ordering(
1040        &self,
1041        ordering: Option<impl Into<Ordering>>,
1042    ) -> Result<(), ShellError> {
1043        let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1044        self.write(PluginOutput::CallResponse(self.context()?, response))?;
1045        self.flush()
1046    }
1047
1048    pub fn signals(&self) -> &Signals {
1049        &self.state.signals
1050    }
1051}
1052
1053impl Interface for EngineInterface {
1054    type Output = PluginOutput;
1055    type DataContext = ();
1056
1057    fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1058        log::trace!("to engine: {output:?}");
1059        self.state.writer.write(&output)
1060    }
1061
1062    fn flush(&self) -> Result<(), ShellError> {
1063        self.state.writer.flush()
1064    }
1065
1066    fn stream_id_sequence(&self) -> &Sequence {
1067        &self.state.stream_id_sequence
1068    }
1069
1070    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1071        &self.stream_manager_handle
1072    }
1073
1074    fn prepare_pipeline_data(
1075        &self,
1076        mut data: PipelineData,
1077        _context: &(),
1078    ) -> Result<PipelineData, ShellError> {
1079        // Serialize custom values in the pipeline data
1080        match data {
1081            PipelineData::Value(ref mut value, _) => {
1082                PluginCustomValue::serialize_custom_values_in(value)?;
1083                Ok(data)
1084            }
1085            PipelineData::ListStream(stream, meta) => {
1086                let stream = stream.map(|mut value| {
1087                    let span = value.span();
1088                    PluginCustomValue::serialize_custom_values_in(&mut value)
1089                        .map(|_| value)
1090                        .unwrap_or_else(|err| Value::error(err, span))
1091                });
1092                Ok(PipelineData::list_stream(stream, meta))
1093            }
1094            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1095        }
1096    }
1097}
1098
1099/// Keeps the plugin in the foreground as long as it is alive.
1100///
1101/// Use [`.leave()`](Self::leave) to leave the foreground without ignoring the error.
1102pub struct ForegroundGuard(Option<EngineInterface>);
1103
1104impl ForegroundGuard {
1105    // Should be called only once
1106    fn leave_internal(&mut self) -> Result<(), ShellError> {
1107        if let Some(interface) = self.0.take() {
1108            // On Unix, we need to put ourselves back in our own process group
1109            #[cfg(unix)]
1110            {
1111                use nix::unistd::{Pid, setpgid};
1112                // This should always succeed, frankly, but handle the error just in case
1113                setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1114                    nu_protocol::shell_error::io::IoError::new_internal(
1115                        std::io::Error::from(err),
1116                        "Could not set pgid",
1117                    )
1118                })?;
1119            }
1120            interface.leave_foreground()?;
1121        }
1122        Ok(())
1123    }
1124
1125    /// Leave the foreground. In contrast to dropping the guard, this preserves the error (if any).
1126    pub fn leave(mut self) -> Result<(), ShellError> {
1127        let result = self.leave_internal();
1128        std::mem::forget(self);
1129        result
1130    }
1131}
1132
1133impl Drop for ForegroundGuard {
1134    fn drop(&mut self) {
1135        let _ = self.leave_internal();
1136    }
1137}
1138
1139#[cfg(unix)]
1140fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1141    use nix::unistd::{Pid, setpgid};
1142    if let Ok(pgrp) = pgrp.try_into() {
1143        setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| {
1144            ShellError::Generic(
1145                GenericError::new_internal("Failed to set process group for foreground", "")
1146                    .with_help(err.to_string()),
1147            )
1148        })
1149    } else {
1150        Err(ShellError::NushellFailed {
1151            msg: "Engine returned an invalid process group ID".into(),
1152        })
1153    }
1154}
1155
1156#[cfg(not(unix))]
1157fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1158    Err(ShellError::NushellFailed {
1159        msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1160    })
1161}