nu_plugin_engine/interface/
mod.rs

1//! Interface used by the engine to communicate with the plugin.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10    GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11    PluginInput, PluginOption, PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14    CustomValue, DynamicSuggestion, IntoSpanned, PipelineData, PluginMetadata, PluginSignature,
15    ShellError, SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing,
16    engine::Sequence,
17};
18use nu_utils::SharedCow;
19use std::{
20    collections::{BTreeMap, btree_map},
21    path::Path,
22    sync::{Arc, OnceLock, mpsc},
23};
24
25use crate::{
26    PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
27    process::PluginProcess,
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Debug)]
34enum ReceivedPluginCallMessage {
35    /// The final response to send
36    Response(PluginCallResponse<PipelineData>),
37
38    /// An critical error with the interface
39    Error(ShellError),
40
41    /// An engine call that should be evaluated and responded to, but is not the final response
42    ///
43    /// We send this back to the thread that made the plugin call so we don't block the reader
44    /// thread
45    EngineCall(EngineCallId, EngineCall<PipelineData>),
46}
47
48/// Context for plugin call execution
49pub(crate) struct Context(Box<dyn PluginExecutionContext>);
50
51impl std::fmt::Debug for Context {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str("Context")
54    }
55}
56
57impl std::ops::Deref for Context {
58    type Target = dyn PluginExecutionContext;
59
60    fn deref(&self) -> &Self::Target {
61        &*self.0
62    }
63}
64
65/// Internal shared state between the manager and each interface.
66struct PluginInterfaceState {
67    /// The source to be used for custom values coming from / going to the plugin
68    source: Arc<PluginSource>,
69    /// The plugin process being managed
70    process: Option<PluginProcess>,
71    /// Protocol version info, set after `Hello` received
72    protocol_info: Waitable<Arc<ProtocolInfo>>,
73    /// Sequence for generating plugin call ids
74    plugin_call_id_sequence: Sequence,
75    /// Sequence for generating stream ids
76    stream_id_sequence: Sequence,
77    /// Sender to subscribe to a plugin call response
78    plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
79    /// An error that should be propagated to further plugin calls
80    error: OnceLock<ShellError>,
81    /// The synchronized output writer
82    writer: Box<dyn PluginWrite<PluginInput>>,
83}
84
85impl std::fmt::Debug for PluginInterfaceState {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct("PluginInterfaceState")
88            .field("source", &self.source)
89            .field("protocol_info", &self.protocol_info)
90            .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
91            .field("stream_id_sequence", &self.stream_id_sequence)
92            .field(
93                "plugin_call_subscription_sender",
94                &self.plugin_call_subscription_sender,
95            )
96            .field("error", &self.error)
97            .finish_non_exhaustive()
98    }
99}
100
101/// State that the manager keeps for each plugin call during its lifetime.
102#[derive(Debug)]
103struct PluginCallState {
104    /// The sender back to the thread that is waiting for the plugin call response
105    sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
106    /// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an
107    /// error
108    dont_send_response: bool,
109    /// Signals to be used for stream iterators
110    signals: Signals,
111    /// Channel to receive context on to be used if needed
112    context_rx: Option<mpsc::Receiver<Context>>,
113    /// Span associated with the call, if any
114    span: Option<Span>,
115    /// Channel for plugin custom values that should be kept alive for the duration of the plugin
116    /// call. The plugin custom values on this channel are never read, we just hold on to it to keep
117    /// them in memory so they can be dropped at the end of the call. We hold the sender as well so
118    /// we can generate the CurrentCallState.
119    keep_plugin_custom_values: (
120        mpsc::Sender<PluginCustomValueWithSource>,
121        mpsc::Receiver<PluginCustomValueWithSource>,
122    ),
123    /// Number of streams that still need to be read from the plugin call response
124    remaining_streams_to_read: i32,
125}
126
127impl Drop for PluginCallState {
128    fn drop(&mut self) {
129        // Clear the keep custom values channel, so drop notifications can be sent
130        for value in self.keep_plugin_custom_values.1.try_iter() {
131            log::trace!("Dropping custom value that was kept: {value:?}");
132            drop(value);
133        }
134    }
135}
136
137/// Manages reading and dispatching messages for [`PluginInterface`]s.
138#[derive(Debug)]
139pub struct PluginInterfaceManager {
140    /// Shared state
141    state: Arc<PluginInterfaceState>,
142    /// The writer for protocol info
143    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
144    /// Manages stream messages and state
145    stream_manager: StreamManager,
146    /// State related to plugin calls
147    plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
148    /// Receiver for plugin call subscriptions
149    plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
150    /// Tracker for which plugin call streams being read belong to
151    ///
152    /// This is necessary so we know when we can remove context for plugin calls
153    plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
154    /// Garbage collector handle, to notify about the state of the plugin
155    gc: Option<PluginGc>,
156}
157
158impl PluginInterfaceManager {
159    pub fn new(
160        source: Arc<PluginSource>,
161        pid: Option<u32>,
162        writer: impl PluginWrite<PluginInput> + 'static,
163    ) -> PluginInterfaceManager {
164        let (subscription_tx, subscription_rx) = mpsc::channel();
165        let protocol_info_mut = WaitableMut::new();
166
167        PluginInterfaceManager {
168            state: Arc::new(PluginInterfaceState {
169                source,
170                process: pid.map(PluginProcess::new),
171                protocol_info: protocol_info_mut.reader(),
172                plugin_call_id_sequence: Sequence::default(),
173                stream_id_sequence: Sequence::default(),
174                plugin_call_subscription_sender: subscription_tx,
175                error: OnceLock::new(),
176                writer: Box::new(writer),
177            }),
178            protocol_info_mut,
179            stream_manager: StreamManager::new(),
180            plugin_call_states: BTreeMap::new(),
181            plugin_call_subscription_receiver: subscription_rx,
182            plugin_call_input_streams: BTreeMap::new(),
183            gc: None,
184        }
185    }
186
187    /// Add a garbage collector to this plugin. The manager will notify the garbage collector about
188    /// the state of the plugin so that it can be automatically cleaned up if the plugin is
189    /// inactive.
190    pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
191        self.gc = gc;
192    }
193
194    /// Consume pending messages in the `plugin_call_subscription_receiver`
195    fn receive_plugin_call_subscriptions(&mut self) {
196        while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
197            if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
198                e.insert(state);
199            } else {
200                log::warn!("Duplicate plugin call ID ignored: {id}");
201            }
202        }
203    }
204
205    /// Track the start of incoming stream(s)
206    fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
207        self.plugin_call_input_streams.insert(stream_id, call_id);
208        // Increment the number of streams on the subscription so context stays alive
209        self.receive_plugin_call_subscriptions();
210        if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
211            state.remaining_streams_to_read += 1;
212        }
213        // Add a lock to the garbage collector for each stream
214        if let Some(ref gc) = self.gc {
215            gc.increment_locks(1);
216        }
217    }
218
219    /// Track the end of an incoming stream
220    fn recv_stream_ended(&mut self, stream_id: StreamId) {
221        if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
222            if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
223                e.get_mut().remaining_streams_to_read -= 1;
224                // Remove the subscription if there are no more streams to be read.
225                if e.get().remaining_streams_to_read <= 0 {
226                    e.remove();
227                }
228            }
229            // Streams read from the plugin are tracked with locks on the GC so plugins don't get
230            // stopped if they have active streams
231            if let Some(ref gc) = self.gc {
232                gc.decrement_locks(1);
233            }
234        }
235    }
236
237    /// Find the [`Signals`] struct corresponding to the given plugin call id
238    fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
239        // Make sure we're up to date
240        self.receive_plugin_call_subscriptions();
241        // Find the subscription and return the context
242        self.plugin_call_states
243            .get(&id)
244            .map(|state| state.signals.clone())
245            .ok_or_else(|| ShellError::PluginFailedToDecode {
246                msg: format!("Unknown plugin call ID: {id}"),
247            })
248    }
249
250    /// Send a [`PluginCallResponse`] to the appropriate sender
251    fn send_plugin_call_response(
252        &mut self,
253        id: PluginCallId,
254        response: PluginCallResponse<PipelineData>,
255    ) -> Result<(), ShellError> {
256        // Ensure we're caught up on the subscriptions made
257        self.receive_plugin_call_subscriptions();
258
259        if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
260            // Remove the subscription sender, since this will be the last message.
261            //
262            // We can spawn a new one if we need it for engine calls.
263            if !e.get().dont_send_response
264                && e.get_mut()
265                    .sender
266                    .take()
267                    .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
268                    .is_none()
269            {
270                log::warn!("Received a plugin call response for id={id}, but the caller hung up");
271            }
272            // If there are no registered streams, just remove it
273            if e.get().remaining_streams_to_read <= 0 {
274                e.remove();
275            }
276            Ok(())
277        } else {
278            Err(ShellError::PluginFailedToDecode {
279                msg: format!("Unknown plugin call ID: {id}"),
280            })
281        }
282    }
283
284    /// Spawn a handler for engine calls for a plugin, in case we need to handle engine calls
285    /// after the response has already been received (in which case we have nowhere to send them)
286    fn spawn_engine_call_handler(
287        &mut self,
288        id: PluginCallId,
289    ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
290        let interface = self.get_interface();
291
292        if let Some(state) = self.plugin_call_states.get_mut(&id) {
293            if state.sender.is_none() {
294                let (tx, rx) = mpsc::channel();
295                let context_rx =
296                    state
297                        .context_rx
298                        .take()
299                        .ok_or_else(|| ShellError::NushellFailed {
300                            msg: "Tried to spawn the fallback engine call handler more than once"
301                                .into(),
302                        })?;
303
304                // Generate the state needed to handle engine calls
305                let mut current_call_state = CurrentCallState {
306                    context_tx: None,
307                    keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
308                    entered_foreground: false,
309                    span: state.span,
310                };
311
312                let handler = move || {
313                    // We receive on the thread so that we don't block the reader thread
314                    let mut context = context_rx
315                        .recv()
316                        .ok() // The plugin call won't send context if it's not required.
317                        .map(|c| c.0);
318
319                    for msg in rx {
320                        // This thread only handles engine calls.
321                        match msg {
322                            ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
323                                if let Err(err) = interface.handle_engine_call(
324                                    engine_call_id,
325                                    engine_call,
326                                    &mut current_call_state,
327                                    context.as_deref_mut(),
328                                ) {
329                                    log::warn!(
330                                        "Error in plugin post-response engine call handler: \
331                                        {err:?}"
332                                    );
333                                    return;
334                                }
335                            }
336                            other => log::warn!(
337                                "Bad message received in plugin post-response \
338                                engine call handler: {other:?}"
339                            ),
340                        }
341                    }
342                };
343                std::thread::Builder::new()
344                    .name("plugin engine call handler".into())
345                    .spawn(handler)
346                    .expect("failed to spawn thread");
347                state.sender = Some(tx);
348                Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
349            } else {
350                Err(ShellError::NushellFailed {
351                    msg: "Tried to spawn the fallback engine call handler before the plugin call \
352                        response had been received"
353                        .into(),
354                })
355            }
356        } else {
357            Err(ShellError::NushellFailed {
358                msg: format!("Couldn't find plugin ID={id} in subscriptions"),
359            })
360        }
361    }
362
363    /// Send an [`EngineCall`] to the appropriate sender
364    fn send_engine_call(
365        &mut self,
366        plugin_call_id: PluginCallId,
367        engine_call_id: EngineCallId,
368        call: EngineCall<PipelineData>,
369    ) -> Result<(), ShellError> {
370        // Ensure we're caught up on the subscriptions made
371        self.receive_plugin_call_subscriptions();
372
373        // Don't remove the sender, as there could be more calls or responses
374        if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
375            let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
376            // Call if there's an error sending the engine call
377            let send_error = |this: &Self| {
378                log::warn!(
379                    "Received an engine call for plugin_call_id={plugin_call_id}, \
380                    but the caller hung up"
381                );
382                // We really have no choice here but to send the response ourselves and hope we
383                // don't block
384                this.state.writer.write(&PluginInput::EngineCallResponse(
385                    engine_call_id,
386                    EngineCallResponse::Error(ShellError::GenericError {
387                        error: "Caller hung up".to_string(),
388                        msg: "Can't make engine call because the original caller hung up"
389                            .to_string(),
390                        span: None,
391                        help: None,
392                        inner: vec![],
393                    }),
394                ))?;
395                this.state.writer.flush()
396            };
397            // Try to send to the sender if it exists
398            if let Some(sender) = subscription.sender.as_ref() {
399                sender.send(msg).or_else(|_| send_error(self))
400            } else {
401                // The sender no longer exists. Spawn a specific one just for engine calls
402                let sender = self.spawn_engine_call_handler(plugin_call_id)?;
403                sender.send(msg).or_else(|_| send_error(self))
404            }
405        } else {
406            Err(ShellError::PluginFailedToDecode {
407                msg: format!("Unknown plugin call ID: {plugin_call_id}"),
408            })
409        }
410    }
411
412    /// True if there are no other copies of the state (which would mean there are no interfaces
413    /// and no stream readers/writers)
414    pub fn is_finished(&self) -> bool {
415        Arc::strong_count(&self.state) < 2
416    }
417
418    /// Loop on input from the given reader as long as `is_finished()` is false
419    ///
420    /// Any errors will be propagated to all read streams automatically.
421    pub fn consume_all(
422        &mut self,
423        mut reader: impl PluginRead<PluginOutput>,
424    ) -> Result<(), ShellError> {
425        let mut result = Ok(());
426
427        while let Some(msg) = reader.read().transpose() {
428            if self.is_finished() {
429                break;
430            }
431
432            // We assume an error here is unrecoverable (at least, without restarting the plugin)
433            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
434                // Put the error in the state so that new calls see it
435                let _ = self.state.error.set(err.clone());
436                // Error to streams
437                let _ = self.stream_manager.broadcast_read_error(err.clone());
438                // Error to call waiters
439                self.receive_plugin_call_subscriptions();
440                for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
441                    let _ = subscription
442                        .sender
443                        .as_ref()
444                        .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
445                }
446                result = Err(err);
447                break;
448            }
449        }
450
451        // Tell the GC we are exiting so that the plugin doesn't get stuck open
452        if let Some(ref gc) = self.gc {
453            gc.exited();
454        }
455        result
456    }
457}
458
459impl InterfaceManager for PluginInterfaceManager {
460    type Interface = PluginInterface;
461    type Input = PluginOutput;
462
463    fn get_interface(&self) -> Self::Interface {
464        PluginInterface {
465            state: self.state.clone(),
466            stream_manager_handle: self.stream_manager.get_handle(),
467            gc: self.gc.clone(),
468        }
469    }
470
471    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
472        log::trace!("from plugin: {input:?}");
473
474        match input {
475            PluginOutput::Hello(info) => {
476                let info = Arc::new(info);
477                self.protocol_info_mut.set(info.clone())?;
478
479                let local_info = ProtocolInfo::default();
480                if local_info.is_compatible_with(&info)? {
481                    Ok(())
482                } else {
483                    Err(ShellError::PluginFailedToLoad {
484                        msg: format!(
485                            "Plugin `{}` is compiled for nushell version {}, \
486                                which is not compatible with version {}",
487                            self.state.source.name(),
488                            info.version,
489                            local_info.version,
490                        ),
491                    })
492                }
493            }
494            _ if !self.state.protocol_info.is_set() => {
495                // Must send protocol info first
496                Err(ShellError::PluginFailedToLoad {
497                    msg: format!(
498                        "Failed to receive initial Hello message from `{}`. \
499                            This plugin might be too old",
500                        self.state.source.name()
501                    ),
502                })
503            }
504            // Stream messages
505            PluginOutput::Data(..)
506            | PluginOutput::End(..)
507            | PluginOutput::Drop(..)
508            | PluginOutput::Ack(..) => {
509                self.consume_stream_message(input.try_into().map_err(|msg| {
510                    ShellError::NushellFailed {
511                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
512                    }
513                })?)
514            }
515            PluginOutput::Option(option) => match option {
516                PluginOption::GcDisabled(disabled) => {
517                    // Turn garbage collection off/on.
518                    if let Some(ref gc) = self.gc {
519                        gc.set_disabled(disabled);
520                    }
521                    Ok(())
522                }
523            },
524            PluginOutput::CallResponse(id, response) => {
525                // Handle reading the pipeline data, if any
526                let response = response
527                    .map_data(|data| {
528                        let signals = self.get_signals(id)?;
529
530                        // Register the stream in the response
531                        if let Some(stream_id) = data.stream_id() {
532                            self.recv_stream_started(id, stream_id);
533                        }
534
535                        self.read_pipeline_data(data, &signals)
536                    })
537                    .unwrap_or_else(|err| {
538                        // If there's an error with initializing this stream, change it to a plugin
539                        // error response, but send it anyway
540                        PluginCallResponse::Error(err.into())
541                    });
542                let result = self.send_plugin_call_response(id, response);
543                if result.is_ok() {
544                    // When a call ends, it releases a lock on the GC
545                    if let Some(ref gc) = self.gc {
546                        gc.decrement_locks(1);
547                    }
548                }
549                result
550            }
551            PluginOutput::EngineCall { context, id, call } => {
552                let call = call
553                    // Handle reading the pipeline data, if any
554                    .map_data(|input| {
555                        let signals = self.get_signals(context)?;
556                        self.read_pipeline_data(input, &signals)
557                    })
558                    // Do anything extra needed for each engine call setup
559                    .and_then(|mut engine_call| {
560                        match engine_call {
561                            EngineCall::EvalClosure {
562                                ref mut positional, ..
563                            } => {
564                                for arg in positional.iter_mut() {
565                                    // Add source to any plugin custom values in the arguments
566                                    PluginCustomValueWithSource::add_source_in(
567                                        arg,
568                                        &self.state.source,
569                                    )?;
570                                }
571                                Ok(engine_call)
572                            }
573                            _ => Ok(engine_call),
574                        }
575                    });
576                match call {
577                    Ok(call) => self.send_engine_call(context, id, call),
578                    // If there was an error with setting up the call, just write the error
579                    Err(err) => self.get_interface().write_engine_call_response(
580                        id,
581                        EngineCallResponse::Error(err),
582                        &CurrentCallState::default(),
583                    ),
584                }
585            }
586        }
587    }
588
589    fn stream_manager(&self) -> &StreamManager {
590        &self.stream_manager
591    }
592
593    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
594        // Add source to any values
595        match data {
596            PipelineData::Value(ref mut value, _) => {
597                with_custom_values_in(value, |custom_value| {
598                    PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
599                    Ok::<_, ShellError>(())
600                })?;
601                Ok(data)
602            }
603            PipelineData::ListStream(stream, meta) => {
604                let source = self.state.source.clone();
605                Ok(PipelineData::list_stream(
606                    stream.map(move |mut value| {
607                        let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
608                        value
609                    }),
610                    meta,
611                ))
612            }
613            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
614        }
615    }
616
617    fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
618        // Keep track of streams that end
619        if let StreamMessage::End(id) = message {
620            self.recv_stream_ended(id);
621        }
622        self.stream_manager.handle_message(message)
623    }
624}
625
626/// A reference through which a plugin can be interacted with during execution.
627///
628/// This is not a public API.
629#[derive(Debug, Clone)]
630#[doc(hidden)]
631pub struct PluginInterface {
632    /// Shared state
633    state: Arc<PluginInterfaceState>,
634    /// Handle to stream manager
635    stream_manager_handle: StreamManagerHandle,
636    /// Handle to plugin garbage collector
637    gc: Option<PluginGc>,
638}
639
640impl PluginInterface {
641    /// Get the process ID for the plugin, if known.
642    pub fn pid(&self) -> Option<u32> {
643        self.state.process.as_ref().map(|p| p.pid())
644    }
645
646    /// Get the protocol info for the plugin. Will block to receive `Hello` if not received yet.
647    pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
648        self.state.protocol_info.get().and_then(|info| {
649            info.ok_or_else(|| ShellError::PluginFailedToLoad {
650                msg: format!(
651                    "Failed to get protocol info (`Hello` message) from the `{}` plugin",
652                    self.state.source.identity.name()
653                ),
654            })
655        })
656    }
657
658    /// Write the protocol info. This should be done after initialization
659    pub fn hello(&self) -> Result<(), ShellError> {
660        self.write(PluginInput::Hello(ProtocolInfo::default()))?;
661        self.flush()
662    }
663
664    /// Tell the plugin it should not expect any more plugin calls and should terminate after it has
665    /// finished processing the ones it has already received.
666    ///
667    /// Note that this is automatically called when the last existing `PluginInterface` is dropped.
668    /// You probably do not need to call this manually.
669    pub fn goodbye(&self) -> Result<(), ShellError> {
670        self.write(PluginInput::Goodbye)?;
671        self.flush()
672    }
673
674    /// Send the plugin a signal.
675    pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
676        self.write(PluginInput::Signal(action))?;
677        self.flush()
678    }
679
680    /// Write an [`EngineCallResponse`]. Writes the full stream contained in any [`PipelineData`]
681    /// before returning.
682    pub fn write_engine_call_response(
683        &self,
684        id: EngineCallId,
685        response: EngineCallResponse<PipelineData>,
686        state: &CurrentCallState,
687    ) -> Result<(), ShellError> {
688        // Set up any stream if necessary
689        let mut writer = None;
690        let response = response.map_data(|data| {
691            let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
692            writer = Some(data_writer);
693            Ok(data_header)
694        })?;
695
696        // Write the response, including the pipeline data header if present
697        self.write(PluginInput::EngineCallResponse(id, response))?;
698        self.flush()?;
699
700        // If we have a stream to write, do it now
701        if let Some(writer) = writer {
702            writer.write_background()?;
703        }
704
705        Ok(())
706    }
707
708    /// Write a plugin call message. Returns the writer for the stream.
709    fn write_plugin_call(
710        &self,
711        mut call: PluginCall<PipelineData>,
712        context: Option<&dyn PluginExecutionContext>,
713    ) -> Result<WritePluginCallResult, ShellError> {
714        let id = self.state.plugin_call_id_sequence.next()?;
715        let signals = context
716            .map(|c| c.signals().clone())
717            .unwrap_or_else(Signals::empty);
718        let (tx, rx) = mpsc::channel();
719        let (context_tx, context_rx) = mpsc::channel();
720        let keep_plugin_custom_values = mpsc::channel();
721
722        // Set up the state that will stay alive during the call.
723        let state = CurrentCallState {
724            context_tx: Some(context_tx),
725            keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
726            entered_foreground: false,
727            span: call.span(),
728        };
729
730        // Prepare the call with the state.
731        state.prepare_plugin_call(&mut call, &self.state.source)?;
732
733        // Convert the call into one with a header and handle the stream, if necessary
734        let (call, writer) = match call {
735            PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
736            PluginCall::Signature => (PluginCall::Signature, Default::default()),
737            PluginCall::CustomValueOp(value, op) => {
738                (PluginCall::CustomValueOp(value, op), Default::default())
739            }
740            PluginCall::GetCompletion(flag_name) => {
741                (PluginCall::GetCompletion(flag_name), Default::default())
742            }
743            PluginCall::Run(CallInfo { name, call, input }) => {
744                let (header, writer) = self.init_write_pipeline_data(input, &state)?;
745                (
746                    PluginCall::Run(CallInfo {
747                        name,
748                        call,
749                        input: header,
750                    }),
751                    writer,
752                )
753            }
754        };
755
756        // Don't try to send a response for a Dropped call.
757        let dont_send_response =
758            matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
759
760        // Register the subscription to the response, and the context
761        self.state
762            .plugin_call_subscription_sender
763            .send((
764                id,
765                PluginCallState {
766                    sender: Some(tx).filter(|_| !dont_send_response),
767                    dont_send_response,
768                    signals,
769                    context_rx: Some(context_rx),
770                    span: call.span(),
771                    keep_plugin_custom_values,
772                    remaining_streams_to_read: 0,
773                },
774            ))
775            .map_err(|_| {
776                let existing_error = self.state.error.get().cloned();
777                ShellError::GenericError {
778                    error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
779                    msg: "can't complete this operation because the plugin is closed".into(),
780                    span: call.span(),
781                    help: Some(format!(
782                        "the plugin may have experienced an error. Try loading the plugin again \
783                        with `{}`",
784                        self.state.source.identity.use_command(),
785                    )),
786                    inner: existing_error.into_iter().collect(),
787                }
788            })?;
789
790        // Starting a plugin call adds a lock on the GC. Locks are not added for streams being read
791        // by the plugin, so the plugin would have to explicitly tell us if it expects to stay alive
792        // while reading streams in the background after the response ends.
793        if let Some(ref gc) = self.gc {
794            gc.increment_locks(1);
795        }
796
797        // Write request
798        self.write(PluginInput::Call(id, call))?;
799        self.flush()?;
800
801        Ok(WritePluginCallResult {
802            receiver: rx,
803            writer,
804            state,
805        })
806    }
807
808    /// Read the channel for plugin call messages and handle them until the response is received.
809    fn receive_plugin_call_response(
810        &self,
811        rx: mpsc::Receiver<ReceivedPluginCallMessage>,
812        mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
813        mut state: CurrentCallState,
814    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
815        // Handle message from receiver
816        for msg in rx {
817            match msg {
818                ReceivedPluginCallMessage::Response(resp) => {
819                    if state.entered_foreground {
820                        // Make the plugin leave the foreground on return, even if it's a stream
821                        if let Some(context) = context.as_deref_mut()
822                            && let Err(err) =
823                                set_foreground(self.state.process.as_ref(), context, false)
824                        {
825                            log::warn!("Failed to leave foreground state on exit: {err:?}");
826                        }
827                    }
828                    if resp.has_stream() {
829                        // If the response has a stream, we need to register the context
830                        if let Some(context) = context
831                            && let Some(ref context_tx) = state.context_tx
832                        {
833                            let _ = context_tx.send(Context(context.boxed()));
834                        }
835                    }
836                    return Ok(resp);
837                }
838                ReceivedPluginCallMessage::Error(err) => {
839                    return Err(err);
840                }
841                ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
842                    self.handle_engine_call(
843                        engine_call_id,
844                        engine_call,
845                        &mut state,
846                        context.as_deref_mut(),
847                    )?;
848                }
849            }
850        }
851        // If we fail to get a response, check for an error in the state first, and return it if
852        // set. This is probably a much more helpful error than 'failed to receive response' alone
853        let existing_error = self.state.error.get().cloned();
854        Err(ShellError::GenericError {
855            error: format!(
856                "Failed to receive response to plugin call from `{}`",
857                self.state.source.identity.name()
858            ),
859            msg: "while waiting for this operation to complete".into(),
860            span: state.span,
861            help: Some(format!(
862                "try restarting the plugin with `{}`",
863                self.state.source.identity.use_command()
864            )),
865            inner: existing_error.into_iter().collect(),
866        })
867    }
868
869    /// Handle an engine call and write the response.
870    fn handle_engine_call(
871        &self,
872        engine_call_id: EngineCallId,
873        engine_call: EngineCall<PipelineData>,
874        state: &mut CurrentCallState,
875        context: Option<&mut (dyn PluginExecutionContext + '_)>,
876    ) -> Result<(), ShellError> {
877        let process = self.state.process.as_ref();
878        let resp = handle_engine_call(engine_call, state, context, process)
879            .unwrap_or_else(EngineCallResponse::Error);
880        // Handle stream
881        let mut writer = None;
882        let resp = resp
883            .map_data(|data| {
884                let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
885                writer = Some(data_writer);
886                Ok(data_header)
887            })
888            .unwrap_or_else(|err| {
889                // If we fail to set up the response write, change to an error response here
890                writer = None;
891                EngineCallResponse::Error(err)
892            });
893        // Write the response, then the stream
894        self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
895        self.flush()?;
896        if let Some(writer) = writer {
897            writer.write_background()?;
898        }
899        Ok(())
900    }
901
902    /// Perform a plugin call. Input and output streams are handled, and engine calls are handled
903    /// too if there are any before the final response.
904    fn plugin_call(
905        &self,
906        call: PluginCall<PipelineData>,
907        context: Option<&mut dyn PluginExecutionContext>,
908    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
909        // Check for an error in the state first, and return it if set.
910        if let Some(error) = self.state.error.get() {
911            return Err(ShellError::GenericError {
912                error: format!(
913                    "Failed to send plugin call to `{}`",
914                    self.state.source.identity.name()
915                ),
916                msg: "the plugin encountered an error before this operation could be attempted"
917                    .into(),
918                span: call.span(),
919                help: Some(format!(
920                    "try loading the plugin again with `{}`",
921                    self.state.source.identity.use_command(),
922                )),
923                inner: vec![error.clone()],
924            });
925        }
926
927        let result = self.write_plugin_call(call, context.as_deref())?;
928
929        // Finish writing stream in the background
930        result.writer.write_background()?;
931
932        self.receive_plugin_call_response(result.receiver, context, result.state)
933    }
934
935    /// Get the metadata from the plugin.
936    pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
937        match self.plugin_call(PluginCall::Metadata, None)? {
938            PluginCallResponse::Metadata(meta) => Ok(meta),
939            PluginCallResponse::Error(err) => Err(err.into()),
940            _ => Err(ShellError::PluginFailedToDecode {
941                msg: "Received unexpected response to plugin Metadata call".into(),
942            }),
943        }
944    }
945
946    /// Get the command signatures from the plugin.
947    pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
948        match self.plugin_call(PluginCall::Signature, None)? {
949            PluginCallResponse::Signature(sigs) => Ok(sigs),
950            PluginCallResponse::Error(err) => Err(err.into()),
951            _ => Err(ShellError::PluginFailedToDecode {
952                msg: "Received unexpected response to plugin Signature call".into(),
953            }),
954        }
955    }
956
957    /// Run the plugin with the given call and execution context.
958    pub fn run(
959        &self,
960        call: CallInfo<PipelineData>,
961        context: &mut dyn PluginExecutionContext,
962    ) -> Result<PipelineData, ShellError> {
963        match self.plugin_call(PluginCall::Run(call), Some(context))? {
964            PluginCallResponse::PipelineData(data) => Ok(data),
965            PluginCallResponse::Error(err) => Err(err.into()),
966            _ => Err(ShellError::PluginFailedToDecode {
967                msg: "Received unexpected response to plugin Run call".into(),
968            }),
969        }
970    }
971
972    /// Get completion items from the plugin.
973    pub fn get_dynamic_completion(
974        &self,
975        info: GetCompletionInfo,
976    ) -> Result<Option<Vec<DynamicSuggestion>>, ShellError> {
977        match self.plugin_call(PluginCall::GetCompletion(info), None)? {
978            PluginCallResponse::CompletionItems(items) => Ok(items),
979            PluginCallResponse::Error(err) => Err(err.into()),
980            _ => Err(ShellError::PluginFailedToDecode {
981                msg: "Received unexpected response to plugin GetCompletion call".into(),
982            }),
983        }
984    }
985
986    /// Do a custom value op that expects a value response (i.e. most of them)
987    fn custom_value_op_expecting_value(
988        &self,
989        value: Spanned<PluginCustomValueWithSource>,
990        op: CustomValueOp,
991    ) -> Result<Value, ShellError> {
992        let op_name = op.name();
993        let span = value.span;
994
995        // Check that the value came from the right source
996        value.item.verify_source(span, &self.state.source)?;
997
998        let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
999        match self.plugin_call(call, None)? {
1000            PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
1001            PluginCallResponse::Error(err) => Err(err.into()),
1002            _ => Err(ShellError::PluginFailedToDecode {
1003                msg: format!("Received unexpected response to custom value {op_name}() call"),
1004            }),
1005        }
1006    }
1007
1008    /// Collapse a custom value to its base value.
1009    pub fn custom_value_to_base_value(
1010        &self,
1011        value: Spanned<PluginCustomValueWithSource>,
1012    ) -> Result<Value, ShellError> {
1013        self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
1014    }
1015
1016    /// Follow a numbered cell path on a custom value - e.g. `value.0`.
1017    pub fn custom_value_follow_path_int(
1018        &self,
1019        value: Spanned<PluginCustomValueWithSource>,
1020        index: Spanned<usize>,
1021        optional: bool,
1022    ) -> Result<Value, ShellError> {
1023        self.custom_value_op_expecting_value(
1024            value,
1025            CustomValueOp::FollowPathInt { index, optional },
1026        )
1027    }
1028
1029    /// Follow a named cell path on a custom value - e.g. `value.column`.
1030    pub fn custom_value_follow_path_string(
1031        &self,
1032        value: Spanned<PluginCustomValueWithSource>,
1033        column_name: Spanned<String>,
1034        optional: bool,
1035        casing: Casing,
1036    ) -> Result<Value, ShellError> {
1037        self.custom_value_op_expecting_value(
1038            value,
1039            CustomValueOp::FollowPathString {
1040                column_name,
1041                optional,
1042                casing,
1043            },
1044        )
1045    }
1046
1047    /// Invoke comparison logic for custom values.
1048    pub fn custom_value_partial_cmp(
1049        &self,
1050        value: PluginCustomValueWithSource,
1051        other_value: Value,
1052    ) -> Result<Option<Ordering>, ShellError> {
1053        // Check that the value came from the right source
1054        value.verify_source(Span::unknown(), &self.state.source)?;
1055
1056        // Note: the protocol is always designed to have a span with the custom value, but this
1057        // operation doesn't support one.
1058        let call = PluginCall::CustomValueOp(
1059            value.without_source().into_spanned(Span::unknown()),
1060            CustomValueOp::PartialCmp(other_value),
1061        );
1062        match self.plugin_call(call, None)? {
1063            PluginCallResponse::Ordering(ordering) => Ok(ordering),
1064            PluginCallResponse::Error(err) => Err(err.into()),
1065            _ => Err(ShellError::PluginFailedToDecode {
1066                msg: "Received unexpected response to custom value partial_cmp() call".into(),
1067            }),
1068        }
1069    }
1070
1071    /// Invoke functionality for an operator on a custom value.
1072    pub fn custom_value_operation(
1073        &self,
1074        left: Spanned<PluginCustomValueWithSource>,
1075        operator: Spanned<Operator>,
1076        right: Value,
1077    ) -> Result<Value, ShellError> {
1078        self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1079    }
1080
1081    /// Invoke saving operation on a custom value.
1082    pub fn custom_value_save(
1083        &self,
1084        value: Spanned<PluginCustomValueWithSource>,
1085        path: Spanned<&Path>,
1086        save_call_span: Span,
1087    ) -> Result<(), ShellError> {
1088        // Check that the value came from the right source
1089        value.item.verify_source(value.span, &self.state.source)?;
1090
1091        let call = PluginCall::CustomValueOp(
1092            value.map(|cv| cv.without_source()),
1093            CustomValueOp::Save {
1094                path: path.map(ToOwned::to_owned),
1095                save_call_span,
1096            },
1097        );
1098        match self.plugin_call(call, None)? {
1099            PluginCallResponse::Ok => Ok(()),
1100            PluginCallResponse::Error(err) => Err(err.into()),
1101            _ => Err(ShellError::PluginFailedToDecode {
1102                msg: "Received unexpected response to custom value save() call".into(),
1103            }),
1104        }
1105    }
1106
1107    /// Notify the plugin about a dropped custom value.
1108    pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1109        // Make sure we don't block here. This can happen on the receiver thread, which would cause a deadlock. We should not try to receive the response - just let it be discarded.
1110        //
1111        // Note: the protocol is always designed to have a span with the custom value, but this
1112        // operation doesn't support one.
1113        drop(self.write_plugin_call(
1114            PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1115            None,
1116        )?);
1117        Ok(())
1118    }
1119}
1120
1121impl Interface for PluginInterface {
1122    type Output = PluginInput;
1123    type DataContext = CurrentCallState;
1124
1125    fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1126        log::trace!("to plugin: {input:?}");
1127        self.state.writer.write(&input).map_err(|err| {
1128            log::warn!("write() error: {err}");
1129            // If there's an error in the state, return that instead because it's likely more
1130            // descriptive
1131            self.state.error.get().cloned().unwrap_or(err)
1132        })
1133    }
1134
1135    fn flush(&self) -> Result<(), ShellError> {
1136        self.state.writer.flush().map_err(|err| {
1137            log::warn!("flush() error: {err}");
1138            // If there's an error in the state, return that instead because it's likely more
1139            // descriptive
1140            self.state.error.get().cloned().unwrap_or(err)
1141        })
1142    }
1143
1144    fn stream_id_sequence(&self) -> &Sequence {
1145        &self.state.stream_id_sequence
1146    }
1147
1148    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1149        &self.stream_manager_handle
1150    }
1151
1152    fn prepare_pipeline_data(
1153        &self,
1154        data: PipelineData,
1155        state: &CurrentCallState,
1156    ) -> Result<PipelineData, ShellError> {
1157        // Validate the destination of values in the pipeline data
1158        match data {
1159            PipelineData::Value(mut value, meta) => {
1160                state.prepare_value(&mut value, &self.state.source)?;
1161                Ok(PipelineData::value(value, meta))
1162            }
1163            PipelineData::ListStream(stream, meta) => {
1164                let source = self.state.source.clone();
1165                let state = state.clone();
1166                Ok(PipelineData::list_stream(
1167                    stream.map(move |mut value| {
1168                        match state.prepare_value(&mut value, &source) {
1169                            Ok(()) => value,
1170                            // Put the error in the stream instead
1171                            Err(err) => Value::error(err, value.span()),
1172                        }
1173                    }),
1174                    meta,
1175                ))
1176            }
1177            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1178        }
1179    }
1180}
1181
1182impl Drop for PluginInterface {
1183    fn drop(&mut self) {
1184        // Automatically send `Goodbye` if there are no more interfaces. In that case there would be
1185        // only two copies of the state, one of which we hold, and one of which the manager holds.
1186        //
1187        // Our copy is about to be dropped, so there would only be one left, the manager. The
1188        // manager will never send any plugin calls, so we should let the plugin know that.
1189        if Arc::strong_count(&self.state) < 3
1190            && let Err(err) = self.goodbye()
1191        {
1192            log::warn!("Error during plugin Goodbye: {err}");
1193        }
1194    }
1195}
1196
1197/// Return value of [`PluginInterface::write_plugin_call()`].
1198#[must_use]
1199struct WritePluginCallResult {
1200    /// Receiver for plugin call messages related to the written plugin call.
1201    receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1202    /// Writer for the stream, if any.
1203    writer: PipelineDataWriter<PluginInterface>,
1204    /// State to be kept for the duration of the plugin call.
1205    state: CurrentCallState,
1206}
1207
1208/// State related to the current plugin call being executed.
1209#[derive(Default, Clone)]
1210pub struct CurrentCallState {
1211    /// Sender for context, which should be sent if the plugin call returned a stream so that
1212    /// engine calls may continue to be handled.
1213    context_tx: Option<mpsc::Sender<Context>>,
1214    /// Sender for a channel that retains plugin custom values that need to stay alive for the
1215    /// duration of a plugin call.
1216    keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1217    /// The plugin call entered the foreground: this should be cleaned up automatically when the
1218    /// plugin call returns.
1219    entered_foreground: bool,
1220    /// The span that caused the plugin call.
1221    span: Option<Span>,
1222}
1223
1224impl CurrentCallState {
1225    /// Prepare a custom value for write. Verifies custom value origin, and keeps custom values that
1226    /// shouldn't be dropped immediately.
1227    fn prepare_custom_value(
1228        &self,
1229        custom_value: Spanned<&mut Box<dyn CustomValue>>,
1230        source: &PluginSource,
1231    ) -> Result<(), ShellError> {
1232        // Ensure we can use it
1233        PluginCustomValueWithSource::verify_source_of_custom_value(
1234            custom_value.as_deref().map(|cv| &**cv),
1235            source,
1236        )?;
1237
1238        // Check whether we need to keep it
1239        if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1240            && let Some(custom_value) = custom_value
1241                .item
1242                .as_any()
1243                .downcast_ref::<PluginCustomValueWithSource>()
1244            && custom_value.notify_on_drop()
1245        {
1246            log::trace!("Keeping custom value for drop later: {custom_value:?}");
1247            keep_tx
1248                .send(custom_value.clone())
1249                .map_err(|_| ShellError::NushellFailed {
1250                    msg: "Failed to custom value to keep channel".into(),
1251                })?;
1252        }
1253
1254        // Strip the source from it so it can be serialized
1255        PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1256
1257        Ok(())
1258    }
1259
1260    /// Prepare a value for write, including all contained custom values.
1261    fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1262        with_custom_values_in(value, |custom_value| {
1263            self.prepare_custom_value(custom_value, source)
1264        })
1265    }
1266
1267    /// Prepare call arguments for write.
1268    fn prepare_call_args(
1269        &self,
1270        call: &mut EvaluatedCall,
1271        source: &PluginSource,
1272    ) -> Result<(), ShellError> {
1273        for arg in call.positional.iter_mut() {
1274            self.prepare_value(arg, source)?;
1275        }
1276        for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1277            self.prepare_value(arg, source)?;
1278        }
1279        Ok(())
1280    }
1281
1282    /// Prepare a plugin call for write. Does not affect pipeline data, which is handled by
1283    /// `prepare_pipeline_data()` instead.
1284    fn prepare_plugin_call<D>(
1285        &self,
1286        call: &mut PluginCall<D>,
1287        source: &PluginSource,
1288    ) -> Result<(), ShellError> {
1289        match call {
1290            PluginCall::Metadata => Ok(()),
1291            PluginCall::Signature => Ok(()),
1292            PluginCall::GetCompletion(_) => Ok(()),
1293            PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1294            PluginCall::CustomValueOp(_, op) => {
1295                // Handle anything within the op.
1296                match op {
1297                    CustomValueOp::ToBaseValue => Ok(()),
1298                    CustomValueOp::FollowPathInt { .. } => Ok(()),
1299                    CustomValueOp::FollowPathString { .. } => Ok(()),
1300                    CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1301                    CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1302                    CustomValueOp::Save { .. } => Ok(()),
1303                    CustomValueOp::Dropped => Ok(()),
1304                }
1305            }
1306        }
1307    }
1308}
1309
1310/// Handle an engine call.
1311pub(crate) fn handle_engine_call(
1312    call: EngineCall<PipelineData>,
1313    state: &mut CurrentCallState,
1314    context: Option<&mut (dyn PluginExecutionContext + '_)>,
1315    process: Option<&PluginProcess>,
1316) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1317    let call_name = call.name();
1318
1319    let context = context.ok_or_else(|| ShellError::GenericError {
1320        error: "A plugin execution context is required for this engine call".into(),
1321        msg: format!("attempted to call {call_name} outside of a command invocation"),
1322        span: None,
1323        help: Some("this is probably a bug with the plugin".into()),
1324        inner: vec![],
1325    })?;
1326
1327    match call {
1328        EngineCall::GetConfig => {
1329            let config = SharedCow::from(context.get_config()?);
1330            Ok(EngineCallResponse::Config(config))
1331        }
1332        EngineCall::GetPluginConfig => {
1333            let plugin_config = context.get_plugin_config()?;
1334            Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1335        }
1336        EngineCall::GetEnvVar(name) => {
1337            let value = context.get_env_var(&name)?;
1338            Ok(value
1339                .cloned()
1340                .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1341        }
1342        EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1343        EngineCall::GetCurrentDir => {
1344            let current_dir = context.get_current_dir()?;
1345            Ok(EngineCallResponse::value(Value::string(
1346                current_dir.item,
1347                current_dir.span,
1348            )))
1349        }
1350        EngineCall::AddEnvVar(name, value) => {
1351            context.add_env_var(name, value)?;
1352            Ok(EngineCallResponse::empty())
1353        }
1354        EngineCall::GetHelp => {
1355            let help = context.get_help()?;
1356            Ok(EngineCallResponse::value(Value::string(
1357                help.item, help.span,
1358            )))
1359        }
1360        EngineCall::EnterForeground => {
1361            let resp = set_foreground(process, context, true)?;
1362            state.entered_foreground = true;
1363            Ok(resp)
1364        }
1365        EngineCall::LeaveForeground => {
1366            let resp = set_foreground(process, context, false)?;
1367            state.entered_foreground = false;
1368            Ok(resp)
1369        }
1370        EngineCall::GetSpanContents(span) => {
1371            let contents = context.get_span_contents(span)?;
1372            Ok(EngineCallResponse::value(Value::binary(
1373                contents.item,
1374                contents.span,
1375            )))
1376        }
1377        EngineCall::EvalClosure {
1378            closure,
1379            positional,
1380            input,
1381            redirect_stdout,
1382            redirect_stderr,
1383        } => context
1384            .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1385            .map(EngineCallResponse::PipelineData),
1386        EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1387            if let Some(decl_id) = decl_id {
1388                EngineCallResponse::Identifier(decl_id)
1389            } else {
1390                EngineCallResponse::empty()
1391            }
1392        }),
1393        EngineCall::CallDecl {
1394            decl_id,
1395            call,
1396            input,
1397            redirect_stdout,
1398            redirect_stderr,
1399        } => context
1400            .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1401            .map(EngineCallResponse::PipelineData),
1402    }
1403}
1404
1405/// Implements enter/exit foreground
1406fn set_foreground(
1407    process: Option<&PluginProcess>,
1408    context: &mut dyn PluginExecutionContext,
1409    enter: bool,
1410) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1411    if let Some(process) = process {
1412        if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1413            if enter {
1414                let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1415                Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1416                    EngineCallResponse::value(Value::int(id as i64, context.span()))
1417                }))
1418            } else {
1419                process.exit_foreground()?;
1420                Ok(EngineCallResponse::empty())
1421            }
1422        } else {
1423            // This should always be present on a real context
1424            Err(ShellError::NushellFailed {
1425                msg: "missing required pipeline_externals_state from context \
1426                            for entering foreground"
1427                    .into(),
1428            })
1429        }
1430    } else {
1431        Err(ShellError::GenericError {
1432            error: "Can't manage plugin process to enter foreground".into(),
1433            msg: "the process ID for this plugin is unknown".into(),
1434            span: Some(context.span()),
1435            help: Some("the plugin may be running in a test".into()),
1436            inner: vec![],
1437        })
1438    }
1439}