Skip to main content

nu_plugin_engine/interface/
mod.rs

1//! Interface used by the engine to communicate with the plugin.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10    GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11    PluginInput, PluginOption, PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14    CustomValue, DynamicSuggestion, IntoSpanned, PipelineData, PluginMetadata, PluginSignature,
15    ShellError, SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing,
16    engine::Sequence, shell_error::generic::GenericError,
17};
18use nu_utils::SharedCow;
19use std::{
20    collections::{BTreeMap, btree_map},
21    path::Path,
22    sync::{Arc, OnceLock, mpsc},
23};
24
25use crate::{
26    PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
27    process::PluginProcess,
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Debug)]
34enum ReceivedPluginCallMessage {
35    /// The final response to send
36    Response(PluginCallResponse<PipelineData>),
37
38    /// An critical error with the interface
39    Error(ShellError),
40
41    /// An engine call that should be evaluated and responded to, but is not the final response
42    ///
43    /// We send this back to the thread that made the plugin call so we don't block the reader
44    /// thread
45    EngineCall(EngineCallId, EngineCall<PipelineData>),
46}
47
48/// Context for plugin call execution
49pub(crate) struct Context(Box<dyn PluginExecutionContext>);
50
51impl std::fmt::Debug for Context {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str("Context")
54    }
55}
56
57impl std::ops::Deref for Context {
58    type Target = dyn PluginExecutionContext;
59
60    fn deref(&self) -> &Self::Target {
61        &*self.0
62    }
63}
64
65/// Internal shared state between the manager and each interface.
66struct PluginInterfaceState {
67    /// The source to be used for custom values coming from / going to the plugin
68    source: Arc<PluginSource>,
69    /// The plugin process being managed
70    process: Option<PluginProcess>,
71    /// Protocol version info, set after `Hello` received
72    protocol_info: Waitable<Arc<ProtocolInfo>>,
73    /// Sequence for generating plugin call ids
74    plugin_call_id_sequence: Sequence,
75    /// Sequence for generating stream ids
76    stream_id_sequence: Sequence,
77    /// Sender to subscribe to a plugin call response
78    plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
79    /// An error that should be propagated to further plugin calls
80    error: OnceLock<ShellError>,
81    /// The synchronized output writer
82    writer: Box<dyn PluginWrite<PluginInput>>,
83}
84
85impl std::fmt::Debug for PluginInterfaceState {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("PluginInterfaceState")
88            .field("source", &self.source)
89            .field("protocol_info", &self.protocol_info)
90            .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
91            .field("stream_id_sequence", &self.stream_id_sequence)
92            .field(
93                "plugin_call_subscription_sender",
94                &self.plugin_call_subscription_sender,
95            )
96            .field("error", &self.error)
97            .finish_non_exhaustive()
98    }
99}
100
101/// State that the manager keeps for each plugin call during its lifetime.
102#[derive(Debug)]
103struct PluginCallState {
104    /// The sender back to the thread that is waiting for the plugin call response
105    sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
106    /// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an
107    /// error
108    dont_send_response: bool,
109    /// Signals to be used for stream iterators
110    signals: Signals,
111    /// Channel to receive context on to be used if needed
112    context_rx: Option<mpsc::Receiver<Context>>,
113    /// Span associated with the call, if any
114    span: Option<Span>,
115    /// Channel for plugin custom values that should be kept alive for the duration of the plugin
116    /// call. The plugin custom values on this channel are never read, we just hold on to it to keep
117    /// them in memory so they can be dropped at the end of the call. We hold the sender as well so
118    /// we can generate the CurrentCallState.
119    keep_plugin_custom_values: (
120        mpsc::Sender<PluginCustomValueWithSource>,
121        mpsc::Receiver<PluginCustomValueWithSource>,
122    ),
123    /// Number of streams that still need to be read from the plugin call response
124    remaining_streams_to_read: i32,
125}
126
127impl Drop for PluginCallState {
128    fn drop(&mut self) {
129        // Clear the keep custom values channel, so drop notifications can be sent
130        for value in self.keep_plugin_custom_values.1.try_iter() {
131            log::trace!("Dropping custom value that was kept: {value:?}");
132            drop(value);
133        }
134    }
135}
136
137/// Manages reading and dispatching messages for [`PluginInterface`]s.
138#[derive(Debug)]
139pub struct PluginInterfaceManager {
140    /// Shared state
141    state: Arc<PluginInterfaceState>,
142    /// The writer for protocol info
143    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
144    /// Manages stream messages and state
145    stream_manager: StreamManager,
146    /// State related to plugin calls
147    plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
148    /// Receiver for plugin call subscriptions
149    plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
150    /// Tracker for which plugin call streams being read belong to
151    ///
152    /// This is necessary so we know when we can remove context for plugin calls
153    plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
154    /// Garbage collector handle, to notify about the state of the plugin
155    gc: Option<PluginGc>,
156}
157
158impl PluginInterfaceManager {
159    pub fn new(
160        source: Arc<PluginSource>,
161        pid: Option<u32>,
162        writer: impl PluginWrite<PluginInput> + 'static,
163    ) -> PluginInterfaceManager {
164        let (subscription_tx, subscription_rx) = mpsc::channel();
165        let protocol_info_mut = WaitableMut::new();
166
167        PluginInterfaceManager {
168            state: Arc::new(PluginInterfaceState {
169                source,
170                process: pid.map(PluginProcess::new),
171                protocol_info: protocol_info_mut.reader(),
172                plugin_call_id_sequence: Sequence::default(),
173                stream_id_sequence: Sequence::default(),
174                plugin_call_subscription_sender: subscription_tx,
175                error: OnceLock::new(),
176                writer: Box::new(writer),
177            }),
178            protocol_info_mut,
179            stream_manager: StreamManager::new(),
180            plugin_call_states: BTreeMap::new(),
181            plugin_call_subscription_receiver: subscription_rx,
182            plugin_call_input_streams: BTreeMap::new(),
183            gc: None,
184        }
185    }
186
187    /// Add a garbage collector to this plugin. The manager will notify the garbage collector about
188    /// the state of the plugin so that it can be automatically cleaned up if the plugin is
189    /// inactive.
190    pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
191        self.gc = gc;
192    }
193
194    /// Consume pending messages in the `plugin_call_subscription_receiver`
195    fn receive_plugin_call_subscriptions(&mut self) {
196        while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
197            if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
198                e.insert(state);
199            } else {
200                log::warn!("Duplicate plugin call ID ignored: {id}");
201            }
202        }
203    }
204
205    /// Track the start of incoming stream(s)
206    fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
207        self.plugin_call_input_streams.insert(stream_id, call_id);
208        // Increment the number of streams on the subscription so context stays alive
209        self.receive_plugin_call_subscriptions();
210        if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
211            state.remaining_streams_to_read += 1;
212        }
213        // Add a lock to the garbage collector for each stream
214        if let Some(ref gc) = self.gc {
215            gc.increment_locks(1);
216        }
217    }
218
219    /// Track the end of an incoming stream
220    fn recv_stream_ended(&mut self, stream_id: StreamId) {
221        if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
222            if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
223                e.get_mut().remaining_streams_to_read -= 1;
224                // Remove the subscription if there are no more streams to be read.
225                if e.get().remaining_streams_to_read <= 0 {
226                    e.remove();
227                }
228            }
229            // Streams read from the plugin are tracked with locks on the GC so plugins don't get
230            // stopped if they have active streams
231            if let Some(ref gc) = self.gc {
232                gc.decrement_locks(1);
233            }
234        }
235    }
236
237    /// Find the [`Signals`] struct corresponding to the given plugin call id
238    fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
239        // Make sure we're up to date
240        self.receive_plugin_call_subscriptions();
241        // Find the subscription and return the context
242        self.plugin_call_states
243            .get(&id)
244            .map(|state| state.signals.clone())
245            .ok_or_else(|| ShellError::PluginFailedToDecode {
246                msg: format!("Unknown plugin call ID: {id}"),
247            })
248    }
249
250    /// Send a [`PluginCallResponse`] to the appropriate sender
251    fn send_plugin_call_response(
252        &mut self,
253        id: PluginCallId,
254        response: PluginCallResponse<PipelineData>,
255    ) -> Result<(), ShellError> {
256        // Ensure we're caught up on the subscriptions made
257        self.receive_plugin_call_subscriptions();
258
259        if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
260            // Remove the subscription sender, since this will be the last message.
261            //
262            // We can spawn a new one if we need it for engine calls.
263            if !e.get().dont_send_response
264                && e.get_mut()
265                    .sender
266                    .take()
267                    .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
268                    .is_none()
269            {
270                log::warn!("Received a plugin call response for id={id}, but the caller hung up");
271            }
272            // If there are no registered streams, just remove it
273            if e.get().remaining_streams_to_read <= 0 {
274                e.remove();
275            }
276            Ok(())
277        } else {
278            Err(ShellError::PluginFailedToDecode {
279                msg: format!("Unknown plugin call ID: {id}"),
280            })
281        }
282    }
283
284    /// Spawn a handler for engine calls for a plugin, in case we need to handle engine calls
285    /// after the response has already been received (in which case we have nowhere to send them)
286    fn spawn_engine_call_handler(
287        &mut self,
288        id: PluginCallId,
289    ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
290        let interface = self.get_interface();
291
292        if let Some(state) = self.plugin_call_states.get_mut(&id) {
293            if state.sender.is_none() {
294                let (tx, rx) = mpsc::channel();
295                let context_rx =
296                    state
297                        .context_rx
298                        .take()
299                        .ok_or_else(|| ShellError::NushellFailed {
300                            msg: "Tried to spawn the fallback engine call handler more than once"
301                                .into(),
302                        })?;
303
304                // Generate the state needed to handle engine calls
305                let mut current_call_state = CurrentCallState {
306                    context_tx: None,
307                    keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
308                    entered_foreground: false,
309                    span: state.span,
310                };
311
312                let handler = move || {
313                    // We receive on the thread so that we don't block the reader thread
314                    let mut context = context_rx
315                        .recv()
316                        .ok() // The plugin call won't send context if it's not required.
317                        .map(|c| c.0);
318
319                    for msg in rx {
320                        // This thread only handles engine calls.
321                        match msg {
322                            ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
323                                if let Err(err) = interface.handle_engine_call(
324                                    engine_call_id,
325                                    engine_call,
326                                    &mut current_call_state,
327                                    context.as_deref_mut(),
328                                ) {
329                                    log::warn!(
330                                        "Error in plugin post-response engine call handler: \
331                                        {err:?}"
332                                    );
333                                    return;
334                                }
335                            }
336                            other => log::warn!(
337                                "Bad message received in plugin post-response \
338                                engine call handler: {other:?}"
339                            ),
340                        }
341                    }
342                };
343                std::thread::Builder::new()
344                    .name("plugin engine call handler".into())
345                    .spawn(handler)
346                    .expect("failed to spawn thread");
347                state.sender = Some(tx);
348                Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
349            } else {
350                Err(ShellError::NushellFailed {
351                    msg: "Tried to spawn the fallback engine call handler before the plugin call \
352                        response had been received"
353                        .into(),
354                })
355            }
356        } else {
357            Err(ShellError::NushellFailed {
358                msg: format!("Couldn't find plugin ID={id} in subscriptions"),
359            })
360        }
361    }
362
363    /// Send an [`EngineCall`] to the appropriate sender
364    fn send_engine_call(
365        &mut self,
366        plugin_call_id: PluginCallId,
367        engine_call_id: EngineCallId,
368        call: EngineCall<PipelineData>,
369    ) -> Result<(), ShellError> {
370        // Ensure we're caught up on the subscriptions made
371        self.receive_plugin_call_subscriptions();
372
373        // Don't remove the sender, as there could be more calls or responses
374        if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
375            let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
376            // Call if there's an error sending the engine call
377            let send_error = |this: &Self| {
378                log::warn!(
379                    "Received an engine call for plugin_call_id={plugin_call_id}, \
380                    but the caller hung up"
381                );
382                // We really have no choice here but to send the response ourselves and hope we
383                // don't block
384                this.state.writer.write(&PluginInput::EngineCallResponse(
385                    engine_call_id,
386                    EngineCallResponse::Error(ShellError::Generic(GenericError::new_internal(
387                        "Caller hung up",
388                        "Can't make engine call because the original caller hung up",
389                    ))),
390                ))?;
391                this.state.writer.flush()
392            };
393            // Try to send to the sender if it exists
394            if let Some(sender) = subscription.sender.as_ref() {
395                sender.send(msg).or_else(|_| send_error(self))
396            } else {
397                // The sender no longer exists. Spawn a specific one just for engine calls
398                let sender = self.spawn_engine_call_handler(plugin_call_id)?;
399                sender.send(msg).or_else(|_| send_error(self))
400            }
401        } else {
402            Err(ShellError::PluginFailedToDecode {
403                msg: format!("Unknown plugin call ID: {plugin_call_id}"),
404            })
405        }
406    }
407
408    /// True if there are no other copies of the state (which would mean there are no interfaces
409    /// and no stream readers/writers)
410    pub fn is_finished(&self) -> bool {
411        Arc::strong_count(&self.state) < 2
412    }
413
414    /// Loop on input from the given reader as long as `is_finished()` is false
415    ///
416    /// Any errors will be propagated to all read streams automatically.
417    pub fn consume_all(
418        &mut self,
419        mut reader: impl PluginRead<PluginOutput>,
420    ) -> Result<(), ShellError> {
421        let mut result = Ok(());
422
423        while let Some(msg) = reader.read().transpose() {
424            if self.is_finished() {
425                break;
426            }
427
428            // We assume an error here is unrecoverable (at least, without restarting the plugin)
429            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
430                // Put the error in the state so that new calls see it
431                let _ = self.state.error.set(err.clone());
432                // Error to streams
433                let _ = self.stream_manager.broadcast_read_error(err.clone());
434                // Error to call waiters
435                self.receive_plugin_call_subscriptions();
436                for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
437                    let _ = subscription
438                        .sender
439                        .as_ref()
440                        .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
441                }
442                result = Err(err);
443                break;
444            }
445        }
446
447        // Tell the GC we are exiting so that the plugin doesn't get stuck open
448        if let Some(ref gc) = self.gc {
449            gc.exited();
450        }
451        result
452    }
453}
454
455impl InterfaceManager for PluginInterfaceManager {
456    type Interface = PluginInterface;
457    type Input = PluginOutput;
458
459    fn get_interface(&self) -> Self::Interface {
460        PluginInterface {
461            state: self.state.clone(),
462            stream_manager_handle: self.stream_manager.get_handle(),
463            gc: self.gc.clone(),
464        }
465    }
466
467    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
468        log::trace!("from plugin: {input:?}");
469
470        match input {
471            PluginOutput::Hello(info) => {
472                let info = Arc::new(info);
473                self.protocol_info_mut.set(info.clone())?;
474
475                let local_info = ProtocolInfo::default();
476                if local_info.is_compatible_with(&info)? {
477                    Ok(())
478                } else {
479                    Err(ShellError::PluginFailedToLoad {
480                        msg: format!(
481                            "Plugin `{}` is compiled for nushell version {}, \
482                                which is not compatible with version {}",
483                            self.state.source.name(),
484                            info.version,
485                            local_info.version,
486                        ),
487                    })
488                }
489            }
490            _ if !self.state.protocol_info.is_set() => {
491                // Must send protocol info first
492                Err(ShellError::PluginFailedToLoad {
493                    msg: format!(
494                        "Failed to receive initial Hello message from `{}`. \
495                            This plugin might be too old",
496                        self.state.source.name()
497                    ),
498                })
499            }
500            // Stream messages
501            PluginOutput::Data(..)
502            | PluginOutput::End(..)
503            | PluginOutput::Drop(..)
504            | PluginOutput::Ack(..) => {
505                self.consume_stream_message(input.try_into().map_err(|msg| {
506                    ShellError::NushellFailed {
507                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
508                    }
509                })?)
510            }
511            PluginOutput::Option(option) => match option {
512                PluginOption::GcDisabled(disabled) => {
513                    // Turn garbage collection off/on.
514                    if let Some(ref gc) = self.gc {
515                        gc.set_disabled(disabled);
516                    }
517                    Ok(())
518                }
519            },
520            PluginOutput::CallResponse(id, response) => {
521                // Handle reading the pipeline data, if any
522                let response = response
523                    .map_data(|data| {
524                        let signals = self.get_signals(id)?;
525
526                        // Register the stream in the response
527                        if let Some(stream_id) = data.stream_id() {
528                            self.recv_stream_started(id, stream_id);
529                        }
530
531                        self.read_pipeline_data(data, &signals)
532                    })
533                    .unwrap_or_else(|err| {
534                        // If there's an error with initializing this stream, change it to a plugin
535                        // error response, but send it anyway
536                        PluginCallResponse::Error(err)
537                    });
538                let result = self.send_plugin_call_response(id, response);
539                if result.is_ok() {
540                    // When a call ends, it releases a lock on the GC
541                    if let Some(ref gc) = self.gc {
542                        gc.decrement_locks(1);
543                    }
544                }
545                result
546            }
547            PluginOutput::EngineCall { context, id, call } => {
548                let call = call
549                    // Handle reading the pipeline data, if any
550                    .map_data(|input| {
551                        let signals = self.get_signals(context)?;
552                        self.read_pipeline_data(input, &signals)
553                    })
554                    // Do anything extra needed for each engine call setup
555                    .and_then(|mut engine_call| {
556                        match engine_call {
557                            EngineCall::EvalClosure {
558                                ref mut positional, ..
559                            } => {
560                                for arg in positional.iter_mut() {
561                                    // Add source to any plugin custom values in the arguments
562                                    PluginCustomValueWithSource::add_source_in(
563                                        arg,
564                                        &self.state.source,
565                                    )?;
566                                }
567                                Ok(engine_call)
568                            }
569                            _ => Ok(engine_call),
570                        }
571                    });
572                match call {
573                    Ok(call) => self.send_engine_call(context, id, call),
574                    // If there was an error with setting up the call, just write the error
575                    Err(err) => self.get_interface().write_engine_call_response(
576                        id,
577                        EngineCallResponse::Error(err),
578                        &CurrentCallState::default(),
579                    ),
580                }
581            }
582        }
583    }
584
585    fn stream_manager(&self) -> &StreamManager {
586        &self.stream_manager
587    }
588
589    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
590        // Add source to any values
591        match data {
592            PipelineData::Value(ref mut value, _) => {
593                with_custom_values_in(value, |custom_value| {
594                    PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
595                    Ok::<_, ShellError>(())
596                })?;
597                Ok(data)
598            }
599            PipelineData::ListStream(stream, meta) => {
600                let source = self.state.source.clone();
601                Ok(PipelineData::list_stream(
602                    stream.map(move |mut value| {
603                        let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
604                        value
605                    }),
606                    meta,
607                ))
608            }
609            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
610        }
611    }
612
613    fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
614        // Keep track of streams that end
615        if let StreamMessage::End(id) = message {
616            self.recv_stream_ended(id);
617        }
618        self.stream_manager.handle_message(message)
619    }
620}
621
622/// A reference through which a plugin can be interacted with during execution.
623///
624/// This is not a public API.
625#[derive(Debug, Clone)]
626#[doc(hidden)]
627pub struct PluginInterface {
628    /// Shared state
629    state: Arc<PluginInterfaceState>,
630    /// Handle to stream manager
631    stream_manager_handle: StreamManagerHandle,
632    /// Handle to plugin garbage collector
633    gc: Option<PluginGc>,
634}
635
636impl PluginInterface {
637    /// Get the process ID for the plugin, if known.
638    pub fn pid(&self) -> Option<u32> {
639        self.state.process.as_ref().map(|p| p.pid())
640    }
641
642    /// Get the protocol info for the plugin. Will block to receive `Hello` if not received yet.
643    pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
644        self.state.protocol_info.get().and_then(|info| {
645            info.ok_or_else(|| ShellError::PluginFailedToLoad {
646                msg: format!(
647                    "Failed to get protocol info (`Hello` message) from the `{}` plugin",
648                    self.state.source.identity.name()
649                ),
650            })
651        })
652    }
653
654    /// Write the protocol info. This should be done after initialization
655    pub fn hello(&self) -> Result<(), ShellError> {
656        self.write(PluginInput::Hello(ProtocolInfo::default()))?;
657        self.flush()
658    }
659
660    /// Tell the plugin it should not expect any more plugin calls and should terminate after it has
661    /// finished processing the ones it has already received.
662    ///
663    /// Note that this is automatically called when the last existing `PluginInterface` is dropped.
664    /// You probably do not need to call this manually.
665    pub fn goodbye(&self) -> Result<(), ShellError> {
666        self.write(PluginInput::Goodbye)?;
667        self.flush()
668    }
669
670    /// Send the plugin a signal.
671    pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
672        self.write(PluginInput::Signal(action))?;
673        self.flush()
674    }
675
676    /// Write an [`EngineCallResponse`]. Writes the full stream contained in any [`PipelineData`]
677    /// before returning.
678    pub fn write_engine_call_response(
679        &self,
680        id: EngineCallId,
681        response: EngineCallResponse<PipelineData>,
682        state: &CurrentCallState,
683    ) -> Result<(), ShellError> {
684        // Set up any stream if necessary
685        let mut writer = None;
686        let response = response.map_data(|data| {
687            let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
688            writer = Some(data_writer);
689            Ok(data_header)
690        })?;
691
692        // Write the response, including the pipeline data header if present
693        self.write(PluginInput::EngineCallResponse(id, response))?;
694        self.flush()?;
695
696        // If we have a stream to write, do it now
697        if let Some(writer) = writer {
698            writer.write_background()?;
699        }
700
701        Ok(())
702    }
703
704    /// Write a plugin call message. Returns the writer for the stream.
705    fn write_plugin_call(
706        &self,
707        mut call: PluginCall<PipelineData>,
708        context: Option<&dyn PluginExecutionContext>,
709    ) -> Result<WritePluginCallResult, ShellError> {
710        let id = self.state.plugin_call_id_sequence.next()?;
711        let signals = context
712            .map(|c| c.signals().clone())
713            .unwrap_or_else(Signals::empty);
714        let (tx, rx) = mpsc::channel();
715        let (context_tx, context_rx) = mpsc::channel();
716        let keep_plugin_custom_values = mpsc::channel();
717
718        // Set up the state that will stay alive during the call.
719        let state = CurrentCallState {
720            context_tx: Some(context_tx),
721            keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
722            entered_foreground: false,
723            span: call.span(),
724        };
725
726        // Prepare the call with the state.
727        state.prepare_plugin_call(&mut call, &self.state.source)?;
728
729        // Convert the call into one with a header and handle the stream, if necessary
730        let (call, writer) = match call {
731            PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
732            PluginCall::Signature => (PluginCall::Signature, Default::default()),
733            PluginCall::CustomValueOp(value, op) => {
734                (PluginCall::CustomValueOp(value, op), Default::default())
735            }
736            PluginCall::GetCompletion(flag_name) => {
737                (PluginCall::GetCompletion(flag_name), Default::default())
738            }
739            PluginCall::Run(CallInfo { name, call, input }) => {
740                let (header, writer) = self.init_write_pipeline_data(input, &state)?;
741                (
742                    PluginCall::Run(CallInfo {
743                        name,
744                        call,
745                        input: header,
746                    }),
747                    writer,
748                )
749            }
750        };
751
752        // Don't try to send a response for a Dropped call.
753        let dont_send_response =
754            matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
755
756        // Register the subscription to the response, and the context
757        self.state
758            .plugin_call_subscription_sender
759            .send((
760                id,
761                PluginCallState {
762                    sender: Some(tx).filter(|_| !dont_send_response),
763                    dont_send_response,
764                    signals,
765                    context_rx: Some(context_rx),
766                    span: call.span(),
767                    keep_plugin_custom_values,
768                    remaining_streams_to_read: 0,
769                },
770            ))
771            .map_err(|_| {
772                let existing_error = self.state.error.get().cloned();
773                let help = format!(
774                    "the plugin may have experienced an error. Try loading the plugin again \
775                        with `{}`",
776                    self.state.source.identity.use_command(),
777                );
778                let error = if let Some(span) = call.span() {
779                    GenericError::new(
780                        format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
781                        "can't complete this operation because the plugin is closed",
782                        span,
783                    )
784                    .with_help(help)
785                    .with_inner(existing_error.into_iter())
786                } else {
787                    GenericError::new_internal(
788                        format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
789                        "can't complete this operation because the plugin is closed",
790                    )
791                    .with_help(help)
792                    .with_inner(existing_error.into_iter())
793                };
794                ShellError::Generic(error)
795            })?;
796
797        // Starting a plugin call adds a lock on the GC. Locks are not added for streams being read
798        // by the plugin, so the plugin would have to explicitly tell us if it expects to stay alive
799        // while reading streams in the background after the response ends.
800        if let Some(ref gc) = self.gc {
801            gc.increment_locks(1);
802        }
803
804        // Write request
805        self.write(PluginInput::Call(id, call))?;
806        self.flush()?;
807
808        Ok(WritePluginCallResult {
809            receiver: rx,
810            writer,
811            state,
812        })
813    }
814
815    /// Read the channel for plugin call messages and handle them until the response is received.
816    fn receive_plugin_call_response(
817        &self,
818        rx: mpsc::Receiver<ReceivedPluginCallMessage>,
819        mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
820        mut state: CurrentCallState,
821    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
822        // Handle message from receiver
823        for msg in rx {
824            match msg {
825                ReceivedPluginCallMessage::Response(resp) => {
826                    if state.entered_foreground {
827                        // Make the plugin leave the foreground on return, even if it's a stream
828                        if let Some(context) = context.as_deref_mut()
829                            && let Err(err) =
830                                set_foreground(self.state.process.as_ref(), context, false)
831                        {
832                            log::warn!("Failed to leave foreground state on exit: {err:?}");
833                        }
834                    }
835                    if resp.has_stream() {
836                        // If the response has a stream, we need to register the context
837                        if let Some(context) = context
838                            && let Some(ref context_tx) = state.context_tx
839                        {
840                            let _ = context_tx.send(Context(context.boxed()));
841                        }
842                    }
843                    return Ok(resp);
844                }
845                ReceivedPluginCallMessage::Error(err) => {
846                    return Err(err);
847                }
848                ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
849                    self.handle_engine_call(
850                        engine_call_id,
851                        engine_call,
852                        &mut state,
853                        context.as_deref_mut(),
854                    )?;
855                }
856            }
857        }
858        // If we fail to get a response, check for an error in the state first, and return it if
859        // set. This is probably a much more helpful error than 'failed to receive response' alone
860        let existing_error = self.state.error.get().cloned();
861        {
862            let help = format!(
863                "try restarting the plugin with `{}`",
864                self.state.source.identity.use_command()
865            );
866            let error = if let Some(span) = state.span {
867                GenericError::new(
868                    format!(
869                        "Failed to receive response to plugin call from `{}`",
870                        self.state.source.identity.name()
871                    ),
872                    "while waiting for this operation to complete",
873                    span,
874                )
875                .with_help(help)
876                .with_inner(existing_error)
877            } else {
878                GenericError::new_internal(
879                    format!(
880                        "Failed to receive response to plugin call from `{}`",
881                        self.state.source.identity.name()
882                    ),
883                    "while waiting for this operation to complete",
884                )
885                .with_help(help)
886                .with_inner(existing_error)
887            };
888            Err(ShellError::Generic(error))
889        }
890    }
891
892    /// Handle an engine call and write the response.
893    fn handle_engine_call(
894        &self,
895        engine_call_id: EngineCallId,
896        engine_call: EngineCall<PipelineData>,
897        state: &mut CurrentCallState,
898        context: Option<&mut (dyn PluginExecutionContext + '_)>,
899    ) -> Result<(), ShellError> {
900        let process = self.state.process.as_ref();
901        let resp = handle_engine_call(engine_call, state, context, process)
902            .unwrap_or_else(EngineCallResponse::Error);
903        // Handle stream
904        let mut writer = None;
905        let resp = resp
906            .map_data(|data| {
907                let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
908                writer = Some(data_writer);
909                Ok(data_header)
910            })
911            .unwrap_or_else(|err| {
912                // If we fail to set up the response write, change to an error response here
913                writer = None;
914                EngineCallResponse::Error(err)
915            });
916        // Write the response, then the stream
917        self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
918        self.flush()?;
919        if let Some(writer) = writer {
920            writer.write_background()?;
921        }
922        Ok(())
923    }
924
925    /// Perform a plugin call. Input and output streams are handled, and engine calls are handled
926    /// too if there are any before the final response.
927    fn plugin_call(
928        &self,
929        call: PluginCall<PipelineData>,
930        context: Option<&mut dyn PluginExecutionContext>,
931    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
932        // Check for an error in the state first, and return it if set.
933        if let Some(error) = self.state.error.get() {
934            let help = format!(
935                "try loading the plugin again with `{}`",
936                self.state.source.identity.use_command(),
937            );
938            let error = if let Some(span) = call.span() {
939                GenericError::new(
940                    format!(
941                        "Failed to send plugin call to `{}`",
942                        self.state.source.identity.name()
943                    ),
944                    "the plugin encountered an error before this operation could be attempted",
945                    span,
946                )
947                .with_help(help)
948                .with_inner([error.clone()])
949            } else {
950                GenericError::new_internal(
951                    format!(
952                        "Failed to send plugin call to `{}`",
953                        self.state.source.identity.name()
954                    ),
955                    "the plugin encountered an error before this operation could be attempted",
956                )
957                .with_help(help)
958                .with_inner([error.clone()])
959            };
960            return Err(ShellError::Generic(error));
961        }
962
963        let result = self.write_plugin_call(call, context.as_deref())?;
964
965        // Finish writing stream in the background
966        result.writer.write_background()?;
967
968        self.receive_plugin_call_response(result.receiver, context, result.state)
969    }
970
971    /// Get the metadata from the plugin.
972    pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
973        match self.plugin_call(PluginCall::Metadata, None)? {
974            PluginCallResponse::Metadata(meta) => Ok(meta),
975            PluginCallResponse::Error(err) => Err(err),
976            _ => Err(ShellError::PluginFailedToDecode {
977                msg: "Received unexpected response to plugin Metadata call".into(),
978            }),
979        }
980    }
981
982    /// Get the command signatures from the plugin.
983    pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
984        match self.plugin_call(PluginCall::Signature, None)? {
985            PluginCallResponse::Signature(sigs) => Ok(sigs),
986            PluginCallResponse::Error(err) => Err(err),
987            _ => Err(ShellError::PluginFailedToDecode {
988                msg: "Received unexpected response to plugin Signature call".into(),
989            }),
990        }
991    }
992
993    /// Run the plugin with the given call and execution context.
994    pub fn run(
995        &self,
996        call: CallInfo<PipelineData>,
997        context: &mut dyn PluginExecutionContext,
998    ) -> Result<PipelineData, ShellError> {
999        match self.plugin_call(PluginCall::Run(call), Some(context))? {
1000            PluginCallResponse::PipelineData(data) => Ok(data),
1001            PluginCallResponse::Error(err) => Err(err),
1002            _ => Err(ShellError::PluginFailedToDecode {
1003                msg: "Received unexpected response to plugin Run call".into(),
1004            }),
1005        }
1006    }
1007
1008    /// Get completion items from the plugin.
1009    pub fn get_dynamic_completion(
1010        &self,
1011        info: GetCompletionInfo,
1012    ) -> Result<Option<Vec<DynamicSuggestion>>, ShellError> {
1013        match self.plugin_call(PluginCall::GetCompletion(info), None)? {
1014            PluginCallResponse::CompletionItems(items) => Ok(items),
1015            PluginCallResponse::Error(err) => Err(err),
1016            _ => Err(ShellError::PluginFailedToDecode {
1017                msg: "Received unexpected response to plugin GetCompletion call".into(),
1018            }),
1019        }
1020    }
1021
1022    /// Do a custom value op that expects a value response (i.e. most of them)
1023    fn custom_value_op_expecting_value(
1024        &self,
1025        value: Spanned<PluginCustomValueWithSource>,
1026        op: CustomValueOp,
1027    ) -> Result<Value, ShellError> {
1028        let op_name = op.name();
1029        let span = value.span;
1030
1031        // Check that the value came from the right source
1032        value.item.verify_source(span, &self.state.source)?;
1033
1034        let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
1035        match self.plugin_call(call, None)? {
1036            PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
1037            PluginCallResponse::Error(err) => Err(err),
1038            _ => Err(ShellError::PluginFailedToDecode {
1039                msg: format!("Received unexpected response to custom value {op_name}() call"),
1040            }),
1041        }
1042    }
1043
1044    /// Collapse a custom value to its base value.
1045    pub fn custom_value_to_base_value(
1046        &self,
1047        value: Spanned<PluginCustomValueWithSource>,
1048    ) -> Result<Value, ShellError> {
1049        self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
1050    }
1051
1052    /// Follow a numbered cell path on a custom value - e.g. `value.0`.
1053    pub fn custom_value_follow_path_int(
1054        &self,
1055        value: Spanned<PluginCustomValueWithSource>,
1056        index: Spanned<usize>,
1057        optional: bool,
1058    ) -> Result<Value, ShellError> {
1059        self.custom_value_op_expecting_value(
1060            value,
1061            CustomValueOp::FollowPathInt { index, optional },
1062        )
1063    }
1064
1065    /// Follow a named cell path on a custom value - e.g. `value.column`.
1066    pub fn custom_value_follow_path_string(
1067        &self,
1068        value: Spanned<PluginCustomValueWithSource>,
1069        column_name: Spanned<String>,
1070        optional: bool,
1071        casing: Casing,
1072    ) -> Result<Value, ShellError> {
1073        self.custom_value_op_expecting_value(
1074            value,
1075            CustomValueOp::FollowPathString {
1076                column_name,
1077                optional,
1078                casing,
1079            },
1080        )
1081    }
1082
1083    /// Invoke comparison logic for custom values.
1084    pub fn custom_value_partial_cmp(
1085        &self,
1086        value: PluginCustomValueWithSource,
1087        other_value: Value,
1088        span: Span,
1089    ) -> Result<Option<Ordering>, ShellError> {
1090        // Check that the value came from the right source
1091        value.verify_source(span, &self.state.source)?;
1092
1093        let call = PluginCall::CustomValueOp(
1094            value.without_source().into_spanned(span),
1095            CustomValueOp::PartialCmp(other_value),
1096        );
1097        match self.plugin_call(call, None)? {
1098            PluginCallResponse::Ordering(ordering) => Ok(ordering),
1099            PluginCallResponse::Error(err) => Err(err),
1100            _ => Err(ShellError::PluginFailedToDecode {
1101                msg: "Received unexpected response to custom value partial_cmp() call".into(),
1102            }),
1103        }
1104    }
1105
1106    /// Invoke functionality for an operator on a custom value.
1107    pub fn custom_value_operation(
1108        &self,
1109        left: Spanned<PluginCustomValueWithSource>,
1110        operator: Spanned<Operator>,
1111        right: Value,
1112    ) -> Result<Value, ShellError> {
1113        self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1114    }
1115
1116    /// Invoke saving operation on a custom value.
1117    pub fn custom_value_save(
1118        &self,
1119        value: Spanned<PluginCustomValueWithSource>,
1120        path: Spanned<&Path>,
1121        save_call_span: Span,
1122    ) -> Result<(), ShellError> {
1123        // Check that the value came from the right source
1124        value.item.verify_source(value.span, &self.state.source)?;
1125
1126        let call = PluginCall::CustomValueOp(
1127            value.map(|cv| cv.without_source()),
1128            CustomValueOp::Save {
1129                path: path.map(ToOwned::to_owned),
1130                save_call_span,
1131            },
1132        );
1133        match self.plugin_call(call, None)? {
1134            PluginCallResponse::Ok => Ok(()),
1135            PluginCallResponse::Error(err) => Err(err),
1136            _ => Err(ShellError::PluginFailedToDecode {
1137                msg: "Received unexpected response to custom value save() call".into(),
1138            }),
1139        }
1140    }
1141
1142    /// Notify the plugin about a dropped custom value.
1143    pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1144        // Make sure we don't block here. This can happen on the receiver thread, which would cause
1145        // a deadlock. We should not try to receive the response - just let it be discarded.
1146        //
1147        // Note: Span::unknown() is used here because this is called from Drop, which has no span
1148        // context available.
1149        drop(self.write_plugin_call(
1150            PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1151            None,
1152        )?);
1153        Ok(())
1154    }
1155}
1156
1157impl Interface for PluginInterface {
1158    type Output = PluginInput;
1159    type DataContext = CurrentCallState;
1160
1161    fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1162        log::trace!("to plugin: {input:?}");
1163        self.state.writer.write(&input).map_err(|err| {
1164            log::warn!("write() error: {err}");
1165            // If there's an error in the state, return that instead because it's likely more
1166            // descriptive
1167            self.state.error.get().cloned().unwrap_or(err)
1168        })
1169    }
1170
1171    fn flush(&self) -> Result<(), ShellError> {
1172        self.state.writer.flush().map_err(|err| {
1173            log::warn!("flush() error: {err}");
1174            // If there's an error in the state, return that instead because it's likely more
1175            // descriptive
1176            self.state.error.get().cloned().unwrap_or(err)
1177        })
1178    }
1179
1180    fn stream_id_sequence(&self) -> &Sequence {
1181        &self.state.stream_id_sequence
1182    }
1183
1184    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1185        &self.stream_manager_handle
1186    }
1187
1188    fn prepare_pipeline_data(
1189        &self,
1190        data: PipelineData,
1191        state: &CurrentCallState,
1192    ) -> Result<PipelineData, ShellError> {
1193        // Validate the destination of values in the pipeline data
1194        match data {
1195            PipelineData::Value(mut value, meta) => {
1196                state.prepare_value(&mut value, &self.state.source)?;
1197                Ok(PipelineData::value(value, meta))
1198            }
1199            PipelineData::ListStream(stream, meta) => {
1200                let source = self.state.source.clone();
1201                let state = state.clone();
1202                Ok(PipelineData::list_stream(
1203                    stream.map(move |mut value| {
1204                        match state.prepare_value(&mut value, &source) {
1205                            Ok(()) => value,
1206                            // Put the error in the stream instead
1207                            Err(err) => Value::error(err, value.span()),
1208                        }
1209                    }),
1210                    meta,
1211                ))
1212            }
1213            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1214        }
1215    }
1216}
1217
1218impl Drop for PluginInterface {
1219    fn drop(&mut self) {
1220        // Automatically send `Goodbye` if there are no more interfaces. In that case there would be
1221        // only two copies of the state, one of which we hold, and one of which the manager holds.
1222        //
1223        // Our copy is about to be dropped, so there would only be one left, the manager. The
1224        // manager will never send any plugin calls, so we should let the plugin know that.
1225        if Arc::strong_count(&self.state) < 3
1226            && let Err(err) = self.goodbye()
1227        {
1228            log::warn!("Error during plugin Goodbye: {err}");
1229        }
1230    }
1231}
1232
1233/// Return value of [`PluginInterface::write_plugin_call()`].
1234#[must_use]
1235struct WritePluginCallResult {
1236    /// Receiver for plugin call messages related to the written plugin call.
1237    receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1238    /// Writer for the stream, if any.
1239    writer: PipelineDataWriter<PluginInterface>,
1240    /// State to be kept for the duration of the plugin call.
1241    state: CurrentCallState,
1242}
1243
1244/// State related to the current plugin call being executed.
1245#[derive(Default, Clone)]
1246pub struct CurrentCallState {
1247    /// Sender for context, which should be sent if the plugin call returned a stream so that
1248    /// engine calls may continue to be handled.
1249    context_tx: Option<mpsc::Sender<Context>>,
1250    /// Sender for a channel that retains plugin custom values that need to stay alive for the
1251    /// duration of a plugin call.
1252    keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1253    /// The plugin call entered the foreground: this should be cleaned up automatically when the
1254    /// plugin call returns.
1255    entered_foreground: bool,
1256    /// The span that caused the plugin call.
1257    span: Option<Span>,
1258}
1259
1260impl CurrentCallState {
1261    /// Prepare a custom value for write. Verifies custom value origin, and keeps custom values that
1262    /// shouldn't be dropped immediately.
1263    fn prepare_custom_value(
1264        &self,
1265        custom_value: Spanned<&mut Box<dyn CustomValue>>,
1266        source: &PluginSource,
1267    ) -> Result<(), ShellError> {
1268        // Ensure we can use it
1269        PluginCustomValueWithSource::verify_source_of_custom_value(
1270            custom_value.as_deref().map(|cv| &**cv),
1271            source,
1272        )?;
1273
1274        // Check whether we need to keep it
1275        if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1276            && let Some(custom_value) = custom_value
1277                .item
1278                .as_any()
1279                .downcast_ref::<PluginCustomValueWithSource>()
1280            && custom_value.notify_on_drop()
1281        {
1282            log::trace!("Keeping custom value for drop later: {custom_value:?}");
1283            keep_tx
1284                .send(custom_value.clone())
1285                .map_err(|_| ShellError::NushellFailed {
1286                    msg: "Failed to custom value to keep channel".into(),
1287                })?;
1288        }
1289
1290        // Strip the source from it so it can be serialized
1291        PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1292
1293        Ok(())
1294    }
1295
1296    /// Prepare a value for write, including all contained custom values.
1297    fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1298        with_custom_values_in(value, |custom_value| {
1299            self.prepare_custom_value(custom_value, source)
1300        })
1301    }
1302
1303    /// Prepare call arguments for write.
1304    fn prepare_call_args(
1305        &self,
1306        call: &mut EvaluatedCall,
1307        source: &PluginSource,
1308    ) -> Result<(), ShellError> {
1309        for arg in call.positional.iter_mut() {
1310            self.prepare_value(arg, source)?;
1311        }
1312        for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1313            self.prepare_value(arg, source)?;
1314        }
1315        Ok(())
1316    }
1317
1318    /// Prepare a plugin call for write. Does not affect pipeline data, which is handled by
1319    /// `prepare_pipeline_data()` instead.
1320    fn prepare_plugin_call<D>(
1321        &self,
1322        call: &mut PluginCall<D>,
1323        source: &PluginSource,
1324    ) -> Result<(), ShellError> {
1325        match call {
1326            PluginCall::Metadata => Ok(()),
1327            PluginCall::Signature => Ok(()),
1328            PluginCall::GetCompletion(_) => Ok(()),
1329            PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1330            PluginCall::CustomValueOp(_, op) => {
1331                // Handle anything within the op.
1332                match op {
1333                    CustomValueOp::ToBaseValue => Ok(()),
1334                    CustomValueOp::FollowPathInt { .. } => Ok(()),
1335                    CustomValueOp::FollowPathString { .. } => Ok(()),
1336                    CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1337                    CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1338                    CustomValueOp::Save { .. } => Ok(()),
1339                    CustomValueOp::Dropped => Ok(()),
1340                }
1341            }
1342        }
1343    }
1344}
1345
1346/// Handle an engine call.
1347pub(crate) fn handle_engine_call(
1348    call: EngineCall<PipelineData>,
1349    state: &mut CurrentCallState,
1350    context: Option<&mut (dyn PluginExecutionContext + '_)>,
1351    process: Option<&PluginProcess>,
1352) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1353    let call_name = call.name();
1354
1355    let context = context.ok_or_else(|| {
1356        ShellError::Generic(
1357            GenericError::new_internal(
1358                "A plugin execution context is required for this engine call",
1359                format!("attempted to call {call_name} outside of a command invocation"),
1360            )
1361            .with_help("this is probably a bug with the plugin"),
1362        )
1363    })?;
1364
1365    match call {
1366        EngineCall::GetConfig => {
1367            let config = SharedCow::from(context.get_config()?);
1368            Ok(EngineCallResponse::Config(config))
1369        }
1370        EngineCall::GetPluginConfig => {
1371            let plugin_config = context.get_plugin_config()?;
1372            Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1373        }
1374        EngineCall::GetEnvVar(name) => {
1375            let value = context.get_env_var(&name)?;
1376            Ok(value
1377                .cloned()
1378                .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1379        }
1380        EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1381        EngineCall::GetCurrentDir => {
1382            let current_dir = context.get_current_dir()?;
1383            Ok(EngineCallResponse::value(Value::string(
1384                current_dir.item,
1385                current_dir.span,
1386            )))
1387        }
1388        EngineCall::AddEnvVar(name, value) => {
1389            context.add_env_var(name, value)?;
1390            Ok(EngineCallResponse::empty())
1391        }
1392        EngineCall::GetHelp => {
1393            let help = context.get_help()?;
1394            Ok(EngineCallResponse::value(Value::string(
1395                help.item, help.span,
1396            )))
1397        }
1398        EngineCall::EnterForeground => {
1399            let resp = set_foreground(process, context, true)?;
1400            state.entered_foreground = true;
1401            Ok(resp)
1402        }
1403        EngineCall::LeaveForeground => {
1404            let resp = set_foreground(process, context, false)?;
1405            state.entered_foreground = false;
1406            Ok(resp)
1407        }
1408        EngineCall::GetSpanContents(span) => {
1409            let contents = context.get_span_contents(span)?;
1410            Ok(EngineCallResponse::value(Value::binary(
1411                contents.item,
1412                contents.span,
1413            )))
1414        }
1415        EngineCall::EvalClosure {
1416            closure,
1417            positional,
1418            input,
1419            redirect_stdout,
1420            redirect_stderr,
1421        } => context
1422            .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1423            .map(EngineCallResponse::PipelineData),
1424        EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1425            if let Some(decl_id) = decl_id {
1426                EngineCallResponse::Identifier(decl_id)
1427            } else {
1428                EngineCallResponse::empty()
1429            }
1430        }),
1431        EngineCall::GetBlockIR(block_id) => context
1432            .get_block_ir(block_id)
1433            .map(|ir| EngineCallResponse::IrBlock(Box::new(ir))),
1434        EngineCall::CallDecl {
1435            decl_id,
1436            call,
1437            input,
1438            redirect_stdout,
1439            redirect_stderr,
1440        } => context
1441            .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1442            .map(EngineCallResponse::PipelineData),
1443    }
1444}
1445
1446/// Implements enter/exit foreground
1447fn set_foreground(
1448    process: Option<&PluginProcess>,
1449    context: &mut dyn PluginExecutionContext,
1450    enter: bool,
1451) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1452    if let Some(process) = process {
1453        if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1454            if enter {
1455                let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1456                Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1457                    EngineCallResponse::value(Value::int(id as i64, context.span()))
1458                }))
1459            } else {
1460                process.exit_foreground()?;
1461                Ok(EngineCallResponse::empty())
1462            }
1463        } else {
1464            // This should always be present on a real context
1465            Err(ShellError::NushellFailed {
1466                msg: "missing required pipeline_externals_state from context \
1467                            for entering foreground"
1468                    .into(),
1469            })
1470        }
1471    } else {
1472        Err(ShellError::Generic(
1473            GenericError::new(
1474                "Can't manage plugin process to enter foreground",
1475                "the process ID for this plugin is unknown",
1476                context.span(),
1477            )
1478            .with_help("the plugin may be running in a test"),
1479        ))
1480    }
1481}