nu_plugin/plugin/interface/
mod.rs

1//! Interface used by the plugin to communicate with the engine.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10    GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11    PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14    BlockId, Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, PipelineData,
15    PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16    engine::{Closure, Sequence},
17    ir::IrBlock,
18};
19use nu_utils::SharedCow;
20use std::{
21    collections::{BTreeMap, HashMap, btree_map},
22    sync::{Arc, atomic::AtomicBool, mpsc},
23};
24
25/// Plugin calls that are received by the [`EngineInterfaceManager`] for handling.
26///
27/// With each call, an [`EngineInterface`] is included that can be provided to the plugin code
28/// and should be used to send the response. The interface sent includes the [`PluginCallId`] for
29/// sending associated messages with the correct context.
30///
31/// This is not a public API.
32#[derive(Debug)]
33#[doc(hidden)]
34pub enum ReceivedPluginCall {
35    Metadata {
36        engine: EngineInterface,
37    },
38    Signature {
39        engine: EngineInterface,
40    },
41    Run {
42        engine: EngineInterface,
43        call: CallInfo<PipelineData>,
44    },
45    GetCompletion {
46        engine: EngineInterface,
47        info: GetCompletionInfo,
48    },
49    CustomValueOp {
50        engine: EngineInterface,
51        custom_value: Spanned<PluginCustomValue>,
52        op: CustomValueOp,
53    },
54}
55
56#[cfg(test)]
57mod tests;
58
59/// Internal shared state between the manager and each interface.
60struct EngineInterfaceState {
61    /// Protocol version info, set after `Hello` received
62    protocol_info: Waitable<Arc<ProtocolInfo>>,
63    /// Sequence for generating engine call ids
64    engine_call_id_sequence: Sequence,
65    /// Sequence for generating stream ids
66    stream_id_sequence: Sequence,
67    /// Sender to subscribe to an engine call response
68    engine_call_subscription_sender:
69        mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
70    /// The synchronized output writer
71    writer: Box<dyn PluginWrite<PluginOutput>>,
72    /// Mirror signals from `EngineState`. You can make use of this with
73    /// `engine_interface.signals()` when constructing a Stream that requires signals
74    signals: Signals,
75    /// Registered signal handlers
76    signal_handlers: Handlers,
77}
78
79impl std::fmt::Debug for EngineInterfaceState {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("EngineInterfaceState")
82            .field("protocol_info", &self.protocol_info)
83            .field("engine_call_id_sequence", &self.engine_call_id_sequence)
84            .field("stream_id_sequence", &self.stream_id_sequence)
85            .field(
86                "engine_call_subscription_sender",
87                &self.engine_call_subscription_sender,
88            )
89            .finish_non_exhaustive()
90    }
91}
92
93/// Manages reading and dispatching messages for [`EngineInterface`]s.
94///
95/// This is not a public API.
96#[derive(Debug)]
97#[doc(hidden)]
98pub struct EngineInterfaceManager {
99    /// Shared state
100    state: Arc<EngineInterfaceState>,
101    /// The writer for protocol info
102    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
103    /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
104    plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
105    /// Receiver for PluginCalls. This is usually taken after initialization
106    plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
107    /// Subscriptions for engine call responses
108    engine_call_subscriptions:
109        BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
110    /// Receiver for engine call subscriptions
111    engine_call_subscription_receiver:
112        mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
113    /// Manages stream messages and state
114    stream_manager: StreamManager,
115}
116
117impl EngineInterfaceManager {
118    pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
119        let (plug_tx, plug_rx) = mpsc::channel();
120        let (subscription_tx, subscription_rx) = mpsc::channel();
121        let protocol_info_mut = WaitableMut::new();
122
123        EngineInterfaceManager {
124            state: Arc::new(EngineInterfaceState {
125                protocol_info: protocol_info_mut.reader(),
126                engine_call_id_sequence: Sequence::default(),
127                stream_id_sequence: Sequence::default(),
128                engine_call_subscription_sender: subscription_tx,
129                writer: Box::new(writer),
130                signals: Signals::new(Arc::new(AtomicBool::new(false))),
131                signal_handlers: Handlers::new(),
132            }),
133            protocol_info_mut,
134            plugin_call_sender: Some(plug_tx),
135            plugin_call_receiver: Some(plug_rx),
136            engine_call_subscriptions: BTreeMap::new(),
137            engine_call_subscription_receiver: subscription_rx,
138            stream_manager: StreamManager::new(),
139        }
140    }
141
142    /// Get the receiving end of the plugin call channel. Plugin calls that need to be handled
143    /// will be sent here.
144    pub(crate) fn take_plugin_call_receiver(
145        &mut self,
146    ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
147        self.plugin_call_receiver.take()
148    }
149
150    /// Create an [`EngineInterface`] associated with the given call id.
151    fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
152        EngineInterface {
153            state: self.state.clone(),
154            stream_manager_handle: self.stream_manager.get_handle(),
155            context: Some(context),
156        }
157    }
158
159    /// Send a [`ReceivedPluginCall`] to the channel
160    fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
161        self.plugin_call_sender
162            .as_ref()
163            .ok_or_else(|| ShellError::PluginFailedToDecode {
164                msg: "Received a plugin call after Goodbye".into(),
165            })?
166            .send(plugin_call)
167            .map_err(|_| ShellError::NushellFailed {
168                msg: "Received a plugin call, but there's nowhere to send it".into(),
169            })
170    }
171
172    /// Flush any remaining subscriptions in the receiver into the map
173    fn receive_engine_call_subscriptions(&mut self) {
174        for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
175            if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
176                e.insert(subscription);
177            } else {
178                log::warn!("Duplicate engine call ID ignored: {id}")
179            }
180        }
181    }
182
183    /// Send a [`EngineCallResponse`] to the appropriate sender
184    fn send_engine_call_response(
185        &mut self,
186        id: EngineCallId,
187        response: EngineCallResponse<PipelineData>,
188    ) -> Result<(), ShellError> {
189        // Ensure all of the subscriptions have been flushed out of the receiver
190        self.receive_engine_call_subscriptions();
191        // Remove the sender - there is only one response per engine call
192        if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
193            if sender.send(response).is_err() {
194                log::warn!("Received an engine call response for id={id}, but the caller hung up");
195            }
196            Ok(())
197        } else {
198            Err(ShellError::PluginFailedToDecode {
199                msg: format!("Unknown engine call ID: {id}"),
200            })
201        }
202    }
203
204    /// True if there are no other copies of the state (which would mean there are no interfaces
205    /// and no stream readers/writers)
206    pub(crate) fn is_finished(&self) -> bool {
207        Arc::strong_count(&self.state) < 2
208    }
209
210    /// Loop on input from the given reader as long as `is_finished()` is false
211    ///
212    /// Any errors will be propagated to all read streams automatically.
213    pub(crate) fn consume_all(
214        &mut self,
215        mut reader: impl PluginRead<PluginInput>,
216    ) -> Result<(), ShellError> {
217        while let Some(msg) = reader.read().transpose() {
218            if self.is_finished() {
219                break;
220            }
221
222            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
223                // Error to streams
224                let _ = self.stream_manager.broadcast_read_error(err.clone());
225                // Error to engine call waiters
226                self.receive_engine_call_subscriptions();
227                for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
228                    let _ = sender.send(EngineCallResponse::Error(err.clone()));
229                }
230                return Err(err);
231            }
232        }
233        Ok(())
234    }
235}
236
237impl InterfaceManager for EngineInterfaceManager {
238    type Interface = EngineInterface;
239    type Input = PluginInput;
240
241    fn get_interface(&self) -> Self::Interface {
242        EngineInterface {
243            state: self.state.clone(),
244            stream_manager_handle: self.stream_manager.get_handle(),
245            context: None,
246        }
247    }
248
249    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
250        log::trace!("from engine: {input:?}");
251        match input {
252            PluginInput::Hello(info) => {
253                let info = Arc::new(info);
254                self.protocol_info_mut.set(info.clone())?;
255
256                let local_info = ProtocolInfo::default();
257                if local_info.is_compatible_with(&info)? {
258                    Ok(())
259                } else {
260                    Err(ShellError::PluginFailedToLoad {
261                        msg: format!(
262                            "Plugin is compiled for nushell version {}, \
263                                which is not compatible with version {}",
264                            local_info.version, info.version
265                        ),
266                    })
267                }
268            }
269            _ if !self.state.protocol_info.is_set() => {
270                // Must send protocol info first
271                Err(ShellError::PluginFailedToLoad {
272                    msg: "Failed to receive initial Hello message. This engine might be too old"
273                        .into(),
274                })
275            }
276            // Stream messages
277            PluginInput::Data(..)
278            | PluginInput::End(..)
279            | PluginInput::Drop(..)
280            | PluginInput::Ack(..) => {
281                self.consume_stream_message(input.try_into().map_err(|msg| {
282                    ShellError::NushellFailed {
283                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
284                    }
285                })?)
286            }
287            PluginInput::Call(id, call) => {
288                let interface = self.interface_for_context(id);
289                // Read streams in the input
290                let call = match call
291                    .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
292                {
293                    Ok(call) => call,
294                    Err(err) => {
295                        // If there's an error with initialization of the input stream, just send
296                        // the error response rather than failing here
297                        return interface.write_response(Err(err))?.write();
298                    }
299                };
300                match call {
301                    // Ask the plugin for metadata
302                    PluginCall::Metadata => {
303                        self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
304                    }
305                    // Ask the plugin for signatures
306                    PluginCall::Signature => {
307                        self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
308                    }
309                    // Parse custom values and send a ReceivedPluginCall
310                    PluginCall::Run(mut call_info) => {
311                        // Deserialize custom values in the arguments
312                        if let Err(err) = deserialize_call_args(&mut call_info.call) {
313                            return interface.write_response(Err(err))?.write();
314                        }
315                        // Send the plugin call to the receiver
316                        self.send_plugin_call(ReceivedPluginCall::Run {
317                            engine: interface,
318                            call: call_info,
319                        })
320                    }
321                    // Send request with the custom value
322                    PluginCall::CustomValueOp(custom_value, op) => {
323                        self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
324                            engine: interface,
325                            custom_value,
326                            op,
327                        })
328                    }
329                    PluginCall::GetCompletion(info) => {
330                        self.send_plugin_call(ReceivedPluginCall::GetCompletion {
331                            engine: interface,
332                            info,
333                        })
334                    }
335                }
336            }
337            PluginInput::Goodbye => {
338                // Remove the plugin call sender so it hangs up
339                drop(self.plugin_call_sender.take());
340                Ok(())
341            }
342            PluginInput::EngineCallResponse(id, response) => {
343                let response = response
344                    .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
345                    .unwrap_or_else(|err| {
346                        // If there's an error with initializing this stream, change it to an engine
347                        // call error response, but send it anyway
348                        EngineCallResponse::Error(err)
349                    });
350                self.send_engine_call_response(id, response)
351            }
352            PluginInput::Signal(action) => {
353                match action {
354                    SignalAction::Interrupt => self.state.signals.trigger(),
355                    SignalAction::Reset => self.state.signals.reset(),
356                }
357                self.state.signal_handlers.run(action);
358                Ok(())
359            }
360        }
361    }
362
363    fn stream_manager(&self) -> &StreamManager {
364        &self.stream_manager
365    }
366
367    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
368        // Deserialize custom values in the pipeline data
369        match data {
370            PipelineData::Value(ref mut value, _) => {
371                PluginCustomValue::deserialize_custom_values_in(value)?;
372                Ok(data)
373            }
374            PipelineData::ListStream(stream, meta) => {
375                let stream = stream.map(|mut value| {
376                    let span = value.span();
377                    PluginCustomValue::deserialize_custom_values_in(&mut value)
378                        .map(|()| value)
379                        .unwrap_or_else(|err| Value::error(err, span))
380                });
381                Ok(PipelineData::list_stream(stream, meta))
382            }
383            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
384        }
385    }
386}
387
388/// Deserialize custom values in call arguments
389fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
390    call.positional
391        .iter_mut()
392        .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
393    call.named
394        .iter_mut()
395        .flat_map(|(_, value)| value.as_mut())
396        .try_for_each(PluginCustomValue::deserialize_custom_values_in)
397}
398
399/// A reference through which the nushell engine can be interacted with during execution.
400#[derive(Debug, Clone)]
401pub struct EngineInterface {
402    /// Shared state with the manager
403    state: Arc<EngineInterfaceState>,
404    /// Handle to stream manager
405    stream_manager_handle: StreamManagerHandle,
406    /// The plugin call this interface belongs to.
407    context: Option<PluginCallId>,
408}
409
410impl EngineInterface {
411    /// Write the protocol info. This should be done after initialization
412    pub(crate) fn hello(&self) -> Result<(), ShellError> {
413        self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
414        self.flush()
415    }
416
417    fn context(&self) -> Result<PluginCallId, ShellError> {
418        self.context.ok_or_else(|| ShellError::NushellFailed {
419            msg: "Tried to call an EngineInterface method that requires a call context \
420                outside of one"
421                .into(),
422        })
423    }
424
425    /// Write an OK call response or an error.
426    pub(crate) fn write_ok(
427        &self,
428        result: Result<(), impl Into<ShellError>>,
429    ) -> Result<(), ShellError> {
430        let response = match result {
431            Ok(()) => PluginCallResponse::Ok,
432            Err(err) => PluginCallResponse::Error(err.into()),
433        };
434        self.write(PluginOutput::CallResponse(self.context()?, response))?;
435        self.flush()
436    }
437
438    /// Write a call response of either [`PipelineData`] or an error. Returns the stream writer
439    /// to finish writing the stream
440    pub(crate) fn write_response(
441        &self,
442        result: Result<PipelineData, impl Into<ShellError>>,
443    ) -> Result<PipelineDataWriter<Self>, ShellError> {
444        match result {
445            Ok(data) => {
446                let (header, writer) = match self.init_write_pipeline_data(data, &()) {
447                    Ok(tup) => tup,
448                    // If we get an error while trying to construct the pipeline data, send that
449                    // instead
450                    Err(err) => return self.write_response(Err(err)),
451                };
452                // Write pipeline data header response, and the full stream
453                let response = PluginCallResponse::PipelineData(header);
454                self.write(PluginOutput::CallResponse(self.context()?, response))?;
455                self.flush()?;
456                Ok(writer)
457            }
458            Err(err) => {
459                let response = PluginCallResponse::Error(err.into());
460                self.write(PluginOutput::CallResponse(self.context()?, response))?;
461                self.flush()?;
462                Ok(Default::default())
463            }
464        }
465    }
466
467    /// Write a call response of plugin metadata.
468    pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
469        let response = PluginCallResponse::Metadata(metadata);
470        self.write(PluginOutput::CallResponse(self.context()?, response))?;
471        self.flush()
472    }
473
474    /// Write a call response of plugin signatures.
475    ///
476    /// Any custom values in the examples will be rendered using `to_base_value()`.
477    pub(crate) fn write_signature(
478        &self,
479        signature: Vec<PluginSignature>,
480    ) -> Result<(), ShellError> {
481        let response = PluginCallResponse::Signature(signature);
482        self.write(PluginOutput::CallResponse(self.context()?, response))?;
483        self.flush()
484    }
485
486    /// Write a call response of completion items.
487    pub(crate) fn write_completion_items(
488        &self,
489        items: Option<Vec<DynamicSuggestion>>,
490    ) -> Result<(), ShellError> {
491        let response = PluginCallResponse::CompletionItems(items);
492        self.write(PluginOutput::CallResponse(self.context()?, response))?;
493        self.flush()
494    }
495
496    /// Write an engine call message. Returns the writer for the stream, and the receiver for
497    /// the response to the engine call.
498    fn write_engine_call(
499        &self,
500        call: EngineCall<PipelineData>,
501    ) -> Result<
502        (
503            PipelineDataWriter<Self>,
504            mpsc::Receiver<EngineCallResponse<PipelineData>>,
505        ),
506        ShellError,
507    > {
508        let context = self.context()?;
509        let id = self.state.engine_call_id_sequence.next()?;
510        let (tx, rx) = mpsc::channel();
511
512        // Convert the call into one with a header and handle the stream, if necessary
513        let mut writer = None;
514
515        let call = call.map_data(|input| {
516            let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
517            writer = Some(input_writer);
518            Ok(input_header)
519        })?;
520
521        // Register the channel
522        self.state
523            .engine_call_subscription_sender
524            .send((id, tx))
525            .map_err(|_| ShellError::NushellFailed {
526                msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
527                    .into(),
528            })?;
529
530        // Write request
531        self.write(PluginOutput::EngineCall { context, id, call })?;
532        self.flush()?;
533
534        Ok((writer.unwrap_or_default(), rx))
535    }
536
537    /// Perform an engine call. Input and output streams are handled.
538    fn engine_call(
539        &self,
540        call: EngineCall<PipelineData>,
541    ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
542        let (writer, rx) = self.write_engine_call(call)?;
543
544        // Finish writing stream in the background
545        writer.write_background()?;
546
547        // Wait on receiver to get the response
548        rx.recv().map_err(|_| ShellError::NushellFailed {
549            msg: "Failed to get response to engine call because the channel was closed".into(),
550        })
551    }
552
553    /// Returns `true` if the plugin is communicating on stdio. When this is the case, stdin and
554    /// stdout should not be used by the plugin for other purposes.
555    ///
556    /// If the plugin can not be used without access to stdio, an error should be presented to the
557    /// user instead.
558    pub fn is_using_stdio(&self) -> bool {
559        self.state.writer.is_stdout()
560    }
561
562    /// Register a closure which will be called when the engine receives an interrupt signal.
563    /// Returns a RAII guard that will keep the closure alive until it is dropped.
564    pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
565        self.state.signal_handlers.register(handler)
566    }
567
568    /// Get the full shell configuration from the engine. As this is quite a large object, it is
569    /// provided on request only.
570    ///
571    /// # Example
572    ///
573    /// Format a value in the user's preferred way:
574    ///
575    /// ```rust,no_run
576    /// # use nu_protocol::{Value, ShellError};
577    /// # use nu_plugin::EngineInterface;
578    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
579    /// let config = engine.get_config()?;
580    /// eprintln!("{}", value.to_expanded_string(", ", &config));
581    /// # Ok(())
582    /// # }
583    /// ```
584    pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
585        match self.engine_call(EngineCall::GetConfig)? {
586            EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
587            EngineCallResponse::Error(err) => Err(err),
588            _ => Err(ShellError::PluginFailedToDecode {
589                msg: "Received unexpected response for EngineCall::GetConfig".into(),
590            }),
591        }
592    }
593
594    /// Do an engine call returning an `Option<Value>` as either `PipelineData::empty()` or
595    /// `PipelineData::value`
596    fn engine_call_option_value(
597        &self,
598        engine_call: EngineCall<PipelineData>,
599    ) -> Result<Option<Value>, ShellError> {
600        let name = engine_call.name();
601        match self.engine_call(engine_call)? {
602            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
603            EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
604            EngineCallResponse::Error(err) => Err(err),
605            _ => Err(ShellError::PluginFailedToDecode {
606                msg: format!("Received unexpected response for EngineCall::{name}"),
607            }),
608        }
609    }
610
611    /// Get the plugin-specific configuration from the engine. This lives in
612    /// `$env.config.plugins.NAME` for a plugin named `NAME`. If the config is set to a closure,
613    /// it is automatically evaluated each time.
614    ///
615    /// # Example
616    ///
617    /// Print this plugin's config:
618    ///
619    /// ```rust,no_run
620    /// # use nu_protocol::{Value, ShellError};
621    /// # use nu_plugin::EngineInterface;
622    /// # fn example(engine: &EngineInterface, value: &Value) -> Result<(), ShellError> {
623    /// let config = engine.get_plugin_config()?;
624    /// eprintln!("{:?}", config);
625    /// # Ok(())
626    /// # }
627    /// ```
628    pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
629        self.engine_call_option_value(EngineCall::GetPluginConfig)
630    }
631
632    /// Get an environment variable from the engine.
633    ///
634    /// Returns `Some(value)` if present, and `None` if not found.
635    ///
636    /// # Example
637    ///
638    /// Get `$env.PATH`:
639    ///
640    /// ```rust,no_run
641    /// # use nu_protocol::{Value, ShellError};
642    /// # use nu_plugin::EngineInterface;
643    /// # fn example(engine: &EngineInterface) -> Result<Option<Value>, ShellError> {
644    /// engine.get_env_var("PATH") // => Ok(Some(Value::List([...])))
645    /// # }
646    /// ```
647    pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
648        self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
649    }
650
651    /// Get the current working directory from the engine. The result is always an absolute path.
652    ///
653    /// # Example
654    /// ```rust,no_run
655    /// # use nu_protocol::{Value, ShellError};
656    /// # use nu_plugin::EngineInterface;
657    /// # fn example(engine: &EngineInterface) -> Result<String, ShellError> {
658    /// engine.get_current_dir() // => "/home/user"
659    /// # }
660    /// ```
661    pub fn get_current_dir(&self) -> Result<String, ShellError> {
662        match self.engine_call(EngineCall::GetCurrentDir)? {
663            // Always a string, and the span doesn't matter.
664            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
665                Ok(val)
666            }
667            EngineCallResponse::Error(err) => Err(err),
668            _ => Err(ShellError::PluginFailedToDecode {
669                msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
670            }),
671        }
672    }
673
674    /// Get all environment variables from the engine.
675    ///
676    /// Since this is quite a large map that has to be sent, prefer to use
677    /// [`.get_env_var()`] (Self::get_env_var) if the variables needed are known ahead of time
678    /// and there are only a small number needed.
679    ///
680    /// # Example
681    /// ```rust,no_run
682    /// # use nu_protocol::{Value, ShellError};
683    /// # use nu_plugin::EngineInterface;
684    /// # use std::collections::HashMap;
685    /// # fn example(engine: &EngineInterface) -> Result<HashMap<String, Value>, ShellError> {
686    /// engine.get_env_vars() // => Ok({"PATH": Value::List([...]), ...})
687    /// # }
688    /// ```
689    pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
690        match self.engine_call(EngineCall::GetEnvVars)? {
691            EngineCallResponse::ValueMap(map) => Ok(map),
692            EngineCallResponse::Error(err) => Err(err),
693            _ => Err(ShellError::PluginFailedToDecode {
694                msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
695            }),
696        }
697    }
698
699    /// Set an environment variable in the caller's scope.
700    ///
701    /// If called after the plugin response has already been sent (i.e. during a stream), this will
702    /// only affect the environment for engine calls related to this plugin call, and will not be
703    /// propagated to the environment of the caller.
704    ///
705    /// # Example
706    /// ```rust,no_run
707    /// # use nu_protocol::{Value, ShellError};
708    /// # use nu_plugin::EngineInterface;
709    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
710    /// engine.add_env_var("FOO", Value::test_string("bar"))
711    /// # }
712    /// ```
713    pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
714        match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
715            EngineCallResponse::PipelineData(_) => Ok(()),
716            EngineCallResponse::Error(err) => Err(err),
717            _ => Err(ShellError::PluginFailedToDecode {
718                msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
719            }),
720        }
721    }
722
723    /// Get the help string for the current command.
724    ///
725    /// This returns the same string as passing `--help` would, and can be used for the top-level
726    /// command in a command group that doesn't do anything on its own (e.g. `query`).
727    ///
728    /// # Example
729    /// ```rust,no_run
730    /// # use nu_protocol::{Value, ShellError};
731    /// # use nu_plugin::EngineInterface;
732    /// # fn example(engine: &EngineInterface) -> Result<(), ShellError> {
733    /// eprintln!("{}", engine.get_help()?);
734    /// # Ok(())
735    /// # }
736    /// ```
737    pub fn get_help(&self) -> Result<String, ShellError> {
738        match self.engine_call(EngineCall::GetHelp)? {
739            EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
740                Ok(val)
741            }
742            _ => Err(ShellError::PluginFailedToDecode {
743                msg: "Received unexpected response type for EngineCall::GetHelp".into(),
744            }),
745        }
746    }
747
748    /// Returns a guard that will keep the plugin in the foreground as long as the guard is alive.
749    ///
750    /// Moving the plugin to the foreground is necessary for plugins that need to receive input and
751    /// signals directly from the terminal.
752    ///
753    /// The exact implementation is operating system-specific. On Unix, this ensures that the
754    /// plugin process becomes part of the process group controlling the terminal.
755    pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
756        match self.engine_call(EngineCall::EnterForeground)? {
757            EngineCallResponse::Error(error) => Err(error),
758            EngineCallResponse::PipelineData(PipelineData::Value(
759                Value::Int { val: pgrp, .. },
760                _,
761            )) => {
762                set_pgrp_from_enter_foreground(pgrp)?;
763                Ok(ForegroundGuard(Some(self.clone())))
764            }
765            EngineCallResponse::PipelineData(PipelineData::Empty) => {
766                Ok(ForegroundGuard(Some(self.clone())))
767            }
768            _ => Err(ShellError::PluginFailedToDecode {
769                msg: "Received unexpected response type for EngineCall::SetForeground".into(),
770            }),
771        }
772    }
773
774    /// Internal: for exiting the foreground after `enter_foreground()`. Called from the guard.
775    fn leave_foreground(&self) -> Result<(), ShellError> {
776        match self.engine_call(EngineCall::LeaveForeground)? {
777            EngineCallResponse::Error(error) => Err(error),
778            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
779            _ => Err(ShellError::PluginFailedToDecode {
780                msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
781            }),
782        }
783    }
784
785    /// Get the contents of a [`Span`] from the engine.
786    ///
787    /// This method returns `Vec<u8>` as it's possible for the matched span to not be a valid UTF-8
788    /// string, perhaps because it sliced through the middle of a UTF-8 byte sequence, as the
789    /// offsets are byte-indexed. Use [`String::from_utf8_lossy()`] for display if necessary.
790    pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
791        match self.engine_call(EngineCall::GetSpanContents(span))? {
792            EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
793                Ok(val)
794            }
795            _ => Err(ShellError::PluginFailedToDecode {
796                msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
797            }),
798        }
799    }
800
801    /// Ask the engine to evaluate a closure. Input to the closure is passed as a stream, and the
802    /// output is available as a stream.
803    ///
804    /// Set `redirect_stdout` to `true` to capture the standard output stream of an external
805    /// command, if the closure results in an external command.
806    ///
807    /// Set `redirect_stderr` to `true` to capture the standard error stream of an external command,
808    /// if the closure results in an external command.
809    ///
810    /// # Example
811    ///
812    /// Invoked as:
813    ///
814    /// ```nushell
815    /// my_command { seq 1 $in | each { |n| $"Hello, ($n)" } }
816    /// ```
817    ///
818    /// ```rust,no_run
819    /// # use nu_protocol::{Value, ShellError, PipelineData};
820    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
821    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
822    /// let closure = call.req(0)?;
823    /// let input = PipelineData::value(Value::int(4, call.head), None);
824    /// let output = engine.eval_closure_with_stream(
825    ///     &closure,
826    ///     vec![],
827    ///     input,
828    ///     true,
829    ///     false,
830    /// )?;
831    /// for value in output {
832    ///     eprintln!("Closure says: {}", value.as_str()?);
833    /// }
834    /// # Ok(())
835    /// # }
836    /// ```
837    ///
838    /// Output:
839    ///
840    /// ```text
841    /// Closure says: Hello, 1
842    /// Closure says: Hello, 2
843    /// Closure says: Hello, 3
844    /// Closure says: Hello, 4
845    /// ```
846    pub fn eval_closure_with_stream(
847        &self,
848        closure: &Spanned<Closure>,
849        mut positional: Vec<Value>,
850        input: PipelineData,
851        redirect_stdout: bool,
852        redirect_stderr: bool,
853    ) -> Result<PipelineData, ShellError> {
854        // Ensure closure args have custom values serialized
855        positional
856            .iter_mut()
857            .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
858
859        let call = EngineCall::EvalClosure {
860            closure: closure.clone(),
861            positional,
862            input,
863            redirect_stdout,
864            redirect_stderr,
865        };
866
867        match self.engine_call(call)? {
868            EngineCallResponse::Error(error) => Err(error),
869            EngineCallResponse::PipelineData(data) => Ok(data),
870            _ => Err(ShellError::PluginFailedToDecode {
871                msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
872            }),
873        }
874    }
875
876    /// Ask the engine to evaluate a closure. Input is optionally passed as a [`Value`], and output
877    /// of the closure is collected to a [`Value`] even if it is a stream.
878    ///
879    /// If the closure results in an external command, the return value will be a collected string
880    /// or binary value of the standard output stream of that command, similar to calling
881    /// [`eval_closure_with_stream()`](Self::eval_closure_with_stream) with `redirect_stdout` =
882    /// `true` and `redirect_stderr` = `false`.
883    ///
884    /// Use [`eval_closure_with_stream()`](Self::eval_closure_with_stream) if more control over the
885    /// input and output is desired.
886    ///
887    /// # Example
888    ///
889    /// Invoked as:
890    ///
891    /// ```nushell
892    /// my_command { |number| $number + 1}
893    /// ```
894    ///
895    /// ```rust,no_run
896    /// # use nu_protocol::{Value, ShellError};
897    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
898    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<(), ShellError> {
899    /// let closure = call.req(0)?;
900    /// for n in 0..4 {
901    ///     let result = engine.eval_closure(&closure, vec![Value::int(n, call.head)], None)?;
902    ///     eprintln!("{} => {}", n, result.as_int()?);
903    /// }
904    /// # Ok(())
905    /// # }
906    /// ```
907    ///
908    /// Output:
909    ///
910    /// ```text
911    /// 0 => 1
912    /// 1 => 2
913    /// 2 => 3
914    /// 3 => 4
915    /// ```
916    pub fn eval_closure(
917        &self,
918        closure: &Spanned<Closure>,
919        positional: Vec<Value>,
920        input: Option<Value>,
921    ) -> Result<Value, ShellError> {
922        let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
923        let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
924        // Unwrap an error value
925        match output.into_value(closure.span)? {
926            Value::Error { error, .. } => Err(*error),
927            value => Ok(value),
928        }
929    }
930
931    /// Ask the engine for the identifier for a declaration. If found, the result can then be passed
932    /// to [`.call_decl()`](Self::call_decl) to call other internal commands.
933    ///
934    /// See [`.call_decl()`](Self::call_decl) for an example.
935    pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
936        let call = EngineCall::FindDecl(name.into());
937
938        match self.engine_call(call)? {
939            EngineCallResponse::Error(err) => Err(err),
940            EngineCallResponse::Identifier(id) => Ok(Some(id)),
941            EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
942            _ => Err(ShellError::PluginFailedToDecode {
943                msg: "Received unexpected response type for EngineCall::FindDecl".into(),
944            }),
945        }
946    }
947
948    /// Get the compiled IR for a block.
949    ///
950    /// # Example
951    ///
952    /// ```rust,no_run
953    /// # use nu_protocol::{ShellError, Spanned};
954    /// # use nu_protocol::engine::Closure;
955    /// # use nu_plugin::EngineInterface;
956    /// # fn example(engine: &EngineInterface, closure: &Spanned<Closure>) -> Result<(), ShellError> {
957    /// let ir_block = engine.get_block_ir(closure.item.block_id)?;
958    /// for instruction in &ir_block.instructions {
959    ///     // ...
960    /// }
961    /// # Ok(())
962    /// # }
963    /// ```
964    pub fn get_block_ir(&self, block_id: BlockId) -> Result<IrBlock, ShellError> {
965        let call = EngineCall::GetBlockIR(block_id);
966
967        match self.engine_call(call)? {
968            EngineCallResponse::Error(err) => Err(err),
969            EngineCallResponse::IrBlock(ir) => Ok(*ir),
970            _ => Err(ShellError::PluginFailedToDecode {
971                msg: "Received unexpected response type for EngineCall::GetBlockIR".into(),
972            }),
973        }
974    }
975
976    /// Ask the engine to call an internal command, using the declaration ID previously looked up
977    /// with [`.find_decl()`](Self::find_decl).
978    ///
979    /// # Example
980    ///
981    /// ```rust,no_run
982    /// # use nu_protocol::{Value, ShellError, PipelineData};
983    /// # use nu_plugin::{EngineInterface, EvaluatedCall};
984    /// # fn example(engine: &EngineInterface, call: &EvaluatedCall) -> Result<Value, ShellError> {
985    /// if let Some(decl_id) = engine.find_decl("scope commands")? {
986    ///     let commands = engine.call_decl(
987    ///         decl_id,
988    ///         EvaluatedCall::new(call.head),
989    ///         PipelineData::empty(),
990    ///         true,
991    ///         false,
992    ///     )?;
993    ///     commands.into_value(call.head)
994    /// } else {
995    ///     Ok(Value::list(vec![], call.head))
996    /// }
997    /// # }
998    /// ```
999    pub fn call_decl(
1000        &self,
1001        decl_id: DeclId,
1002        call: EvaluatedCall,
1003        input: PipelineData,
1004        redirect_stdout: bool,
1005        redirect_stderr: bool,
1006    ) -> Result<PipelineData, ShellError> {
1007        let call = EngineCall::CallDecl {
1008            decl_id,
1009            call,
1010            input,
1011            redirect_stdout,
1012            redirect_stderr,
1013        };
1014
1015        match self.engine_call(call)? {
1016            EngineCallResponse::Error(err) => Err(err),
1017            EngineCallResponse::PipelineData(data) => Ok(data),
1018            _ => Err(ShellError::PluginFailedToDecode {
1019                msg: "Received unexpected response type for EngineCall::CallDecl".into(),
1020            }),
1021        }
1022    }
1023
1024    /// Tell the engine whether to disable garbage collection for this plugin.
1025    ///
1026    /// The garbage collector is enabled by default, but plugins can turn it off (ideally
1027    /// temporarily) as necessary to implement functionality that requires the plugin to stay
1028    /// running for longer than the engine can automatically determine.
1029    ///
1030    /// The user can still stop the plugin if they want to with the `plugin stop` command.
1031    pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1032        self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1033        self.flush()
1034    }
1035
1036    /// Write a call response of [`Ordering`], for `partial_cmp`.
1037    pub(crate) fn write_ordering(
1038        &self,
1039        ordering: Option<impl Into<Ordering>>,
1040    ) -> Result<(), ShellError> {
1041        let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1042        self.write(PluginOutput::CallResponse(self.context()?, response))?;
1043        self.flush()
1044    }
1045
1046    pub fn signals(&self) -> &Signals {
1047        &self.state.signals
1048    }
1049}
1050
1051impl Interface for EngineInterface {
1052    type Output = PluginOutput;
1053    type DataContext = ();
1054
1055    fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1056        log::trace!("to engine: {output:?}");
1057        self.state.writer.write(&output)
1058    }
1059
1060    fn flush(&self) -> Result<(), ShellError> {
1061        self.state.writer.flush()
1062    }
1063
1064    fn stream_id_sequence(&self) -> &Sequence {
1065        &self.state.stream_id_sequence
1066    }
1067
1068    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1069        &self.stream_manager_handle
1070    }
1071
1072    fn prepare_pipeline_data(
1073        &self,
1074        mut data: PipelineData,
1075        _context: &(),
1076    ) -> Result<PipelineData, ShellError> {
1077        // Serialize custom values in the pipeline data
1078        match data {
1079            PipelineData::Value(ref mut value, _) => {
1080                PluginCustomValue::serialize_custom_values_in(value)?;
1081                Ok(data)
1082            }
1083            PipelineData::ListStream(stream, meta) => {
1084                let stream = stream.map(|mut value| {
1085                    let span = value.span();
1086                    PluginCustomValue::serialize_custom_values_in(&mut value)
1087                        .map(|_| value)
1088                        .unwrap_or_else(|err| Value::error(err, span))
1089                });
1090                Ok(PipelineData::list_stream(stream, meta))
1091            }
1092            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1093        }
1094    }
1095}
1096
1097/// Keeps the plugin in the foreground as long as it is alive.
1098///
1099/// Use [`.leave()`](Self::leave) to leave the foreground without ignoring the error.
1100pub struct ForegroundGuard(Option<EngineInterface>);
1101
1102impl ForegroundGuard {
1103    // Should be called only once
1104    fn leave_internal(&mut self) -> Result<(), ShellError> {
1105        if let Some(interface) = self.0.take() {
1106            // On Unix, we need to put ourselves back in our own process group
1107            #[cfg(unix)]
1108            {
1109                use nix::unistd::{Pid, setpgid};
1110                // This should always succeed, frankly, but handle the error just in case
1111                setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1112                    nu_protocol::shell_error::io::IoError::new_internal(
1113                        std::io::Error::from(err),
1114                        "Could not set pgid",
1115                        nu_protocol::location!(),
1116                    )
1117                })?;
1118            }
1119            interface.leave_foreground()?;
1120        }
1121        Ok(())
1122    }
1123
1124    /// Leave the foreground. In contrast to dropping the guard, this preserves the error (if any).
1125    pub fn leave(mut self) -> Result<(), ShellError> {
1126        let result = self.leave_internal();
1127        std::mem::forget(self);
1128        result
1129    }
1130}
1131
1132impl Drop for ForegroundGuard {
1133    fn drop(&mut self) {
1134        let _ = self.leave_internal();
1135    }
1136}
1137
1138#[cfg(unix)]
1139fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1140    use nix::unistd::{Pid, setpgid};
1141    if let Ok(pgrp) = pgrp.try_into() {
1142        setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1143            error: "Failed to set process group for foreground".into(),
1144            msg: "".into(),
1145            span: None,
1146            help: Some(err.to_string()),
1147            inner: vec![],
1148        })
1149    } else {
1150        Err(ShellError::NushellFailed {
1151            msg: "Engine returned an invalid process group ID".into(),
1152        })
1153    }
1154}
1155
1156#[cfg(not(unix))]
1157fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1158    Err(ShellError::NushellFailed {
1159        msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1160    })
1161}