nu_plugin_engine/interface/
mod.rs

1//! Interface used by the engine to communicate with the plugin.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10    PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11    PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14    CustomValue, IntoSpanned, PipelineData, PluginMetadata, PluginSignature, ShellError,
15    SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing, engine::Sequence,
16};
17use nu_utils::SharedCow;
18use std::{
19    collections::{BTreeMap, btree_map},
20    path::Path,
21    sync::{Arc, OnceLock, mpsc},
22};
23
24use crate::{
25    PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
26    process::PluginProcess,
27};
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Debug)]
33enum ReceivedPluginCallMessage {
34    /// The final response to send
35    Response(PluginCallResponse<PipelineData>),
36
37    /// An critical error with the interface
38    Error(ShellError),
39
40    /// An engine call that should be evaluated and responded to, but is not the final response
41    ///
42    /// We send this back to the thread that made the plugin call so we don't block the reader
43    /// thread
44    EngineCall(EngineCallId, EngineCall<PipelineData>),
45}
46
47/// Context for plugin call execution
48pub(crate) struct Context(Box<dyn PluginExecutionContext>);
49
50impl std::fmt::Debug for Context {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.write_str("Context")
53    }
54}
55
56impl std::ops::Deref for Context {
57    type Target = dyn PluginExecutionContext;
58
59    fn deref(&self) -> &Self::Target {
60        &*self.0
61    }
62}
63
64/// Internal shared state between the manager and each interface.
65struct PluginInterfaceState {
66    /// The source to be used for custom values coming from / going to the plugin
67    source: Arc<PluginSource>,
68    /// The plugin process being managed
69    process: Option<PluginProcess>,
70    /// Protocol version info, set after `Hello` received
71    protocol_info: Waitable<Arc<ProtocolInfo>>,
72    /// Sequence for generating plugin call ids
73    plugin_call_id_sequence: Sequence,
74    /// Sequence for generating stream ids
75    stream_id_sequence: Sequence,
76    /// Sender to subscribe to a plugin call response
77    plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
78    /// An error that should be propagated to further plugin calls
79    error: OnceLock<ShellError>,
80    /// The synchronized output writer
81    writer: Box<dyn PluginWrite<PluginInput>>,
82}
83
84impl std::fmt::Debug for PluginInterfaceState {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        f.debug_struct("PluginInterfaceState")
87            .field("source", &self.source)
88            .field("protocol_info", &self.protocol_info)
89            .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
90            .field("stream_id_sequence", &self.stream_id_sequence)
91            .field(
92                "plugin_call_subscription_sender",
93                &self.plugin_call_subscription_sender,
94            )
95            .field("error", &self.error)
96            .finish_non_exhaustive()
97    }
98}
99
100/// State that the manager keeps for each plugin call during its lifetime.
101#[derive(Debug)]
102struct PluginCallState {
103    /// The sender back to the thread that is waiting for the plugin call response
104    sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
105    /// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an
106    /// error
107    dont_send_response: bool,
108    /// Signals to be used for stream iterators
109    signals: Signals,
110    /// Channel to receive context on to be used if needed
111    context_rx: Option<mpsc::Receiver<Context>>,
112    /// Span associated with the call, if any
113    span: Option<Span>,
114    /// Channel for plugin custom values that should be kept alive for the duration of the plugin
115    /// call. The plugin custom values on this channel are never read, we just hold on to it to keep
116    /// them in memory so they can be dropped at the end of the call. We hold the sender as well so
117    /// we can generate the CurrentCallState.
118    keep_plugin_custom_values: (
119        mpsc::Sender<PluginCustomValueWithSource>,
120        mpsc::Receiver<PluginCustomValueWithSource>,
121    ),
122    /// Number of streams that still need to be read from the plugin call response
123    remaining_streams_to_read: i32,
124}
125
126impl Drop for PluginCallState {
127    fn drop(&mut self) {
128        // Clear the keep custom values channel, so drop notifications can be sent
129        for value in self.keep_plugin_custom_values.1.try_iter() {
130            log::trace!("Dropping custom value that was kept: {value:?}");
131            drop(value);
132        }
133    }
134}
135
136/// Manages reading and dispatching messages for [`PluginInterface`]s.
137#[derive(Debug)]
138pub struct PluginInterfaceManager {
139    /// Shared state
140    state: Arc<PluginInterfaceState>,
141    /// The writer for protocol info
142    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
143    /// Manages stream messages and state
144    stream_manager: StreamManager,
145    /// State related to plugin calls
146    plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
147    /// Receiver for plugin call subscriptions
148    plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
149    /// Tracker for which plugin call streams being read belong to
150    ///
151    /// This is necessary so we know when we can remove context for plugin calls
152    plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
153    /// Garbage collector handle, to notify about the state of the plugin
154    gc: Option<PluginGc>,
155}
156
157impl PluginInterfaceManager {
158    pub fn new(
159        source: Arc<PluginSource>,
160        pid: Option<u32>,
161        writer: impl PluginWrite<PluginInput> + 'static,
162    ) -> PluginInterfaceManager {
163        let (subscription_tx, subscription_rx) = mpsc::channel();
164        let protocol_info_mut = WaitableMut::new();
165
166        PluginInterfaceManager {
167            state: Arc::new(PluginInterfaceState {
168                source,
169                process: pid.map(PluginProcess::new),
170                protocol_info: protocol_info_mut.reader(),
171                plugin_call_id_sequence: Sequence::default(),
172                stream_id_sequence: Sequence::default(),
173                plugin_call_subscription_sender: subscription_tx,
174                error: OnceLock::new(),
175                writer: Box::new(writer),
176            }),
177            protocol_info_mut,
178            stream_manager: StreamManager::new(),
179            plugin_call_states: BTreeMap::new(),
180            plugin_call_subscription_receiver: subscription_rx,
181            plugin_call_input_streams: BTreeMap::new(),
182            gc: None,
183        }
184    }
185
186    /// Add a garbage collector to this plugin. The manager will notify the garbage collector about
187    /// the state of the plugin so that it can be automatically cleaned up if the plugin is
188    /// inactive.
189    pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
190        self.gc = gc;
191    }
192
193    /// Consume pending messages in the `plugin_call_subscription_receiver`
194    fn receive_plugin_call_subscriptions(&mut self) {
195        while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
196            if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
197                e.insert(state);
198            } else {
199                log::warn!("Duplicate plugin call ID ignored: {id}");
200            }
201        }
202    }
203
204    /// Track the start of incoming stream(s)
205    fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
206        self.plugin_call_input_streams.insert(stream_id, call_id);
207        // Increment the number of streams on the subscription so context stays alive
208        self.receive_plugin_call_subscriptions();
209        if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
210            state.remaining_streams_to_read += 1;
211        }
212        // Add a lock to the garbage collector for each stream
213        if let Some(ref gc) = self.gc {
214            gc.increment_locks(1);
215        }
216    }
217
218    /// Track the end of an incoming stream
219    fn recv_stream_ended(&mut self, stream_id: StreamId) {
220        if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
221            if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
222                e.get_mut().remaining_streams_to_read -= 1;
223                // Remove the subscription if there are no more streams to be read.
224                if e.get().remaining_streams_to_read <= 0 {
225                    e.remove();
226                }
227            }
228            // Streams read from the plugin are tracked with locks on the GC so plugins don't get
229            // stopped if they have active streams
230            if let Some(ref gc) = self.gc {
231                gc.decrement_locks(1);
232            }
233        }
234    }
235
236    /// Find the [`Signals`] struct corresponding to the given plugin call id
237    fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
238        // Make sure we're up to date
239        self.receive_plugin_call_subscriptions();
240        // Find the subscription and return the context
241        self.plugin_call_states
242            .get(&id)
243            .map(|state| state.signals.clone())
244            .ok_or_else(|| ShellError::PluginFailedToDecode {
245                msg: format!("Unknown plugin call ID: {id}"),
246            })
247    }
248
249    /// Send a [`PluginCallResponse`] to the appropriate sender
250    fn send_plugin_call_response(
251        &mut self,
252        id: PluginCallId,
253        response: PluginCallResponse<PipelineData>,
254    ) -> Result<(), ShellError> {
255        // Ensure we're caught up on the subscriptions made
256        self.receive_plugin_call_subscriptions();
257
258        if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
259            // Remove the subscription sender, since this will be the last message.
260            //
261            // We can spawn a new one if we need it for engine calls.
262            if !e.get().dont_send_response
263                && e.get_mut()
264                    .sender
265                    .take()
266                    .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
267                    .is_none()
268            {
269                log::warn!("Received a plugin call response for id={id}, but the caller hung up");
270            }
271            // If there are no registered streams, just remove it
272            if e.get().remaining_streams_to_read <= 0 {
273                e.remove();
274            }
275            Ok(())
276        } else {
277            Err(ShellError::PluginFailedToDecode {
278                msg: format!("Unknown plugin call ID: {id}"),
279            })
280        }
281    }
282
283    /// Spawn a handler for engine calls for a plugin, in case we need to handle engine calls
284    /// after the response has already been received (in which case we have nowhere to send them)
285    fn spawn_engine_call_handler(
286        &mut self,
287        id: PluginCallId,
288    ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
289        let interface = self.get_interface();
290
291        if let Some(state) = self.plugin_call_states.get_mut(&id) {
292            if state.sender.is_none() {
293                let (tx, rx) = mpsc::channel();
294                let context_rx =
295                    state
296                        .context_rx
297                        .take()
298                        .ok_or_else(|| ShellError::NushellFailed {
299                            msg: "Tried to spawn the fallback engine call handler more than once"
300                                .into(),
301                        })?;
302
303                // Generate the state needed to handle engine calls
304                let mut current_call_state = CurrentCallState {
305                    context_tx: None,
306                    keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
307                    entered_foreground: false,
308                    span: state.span,
309                };
310
311                let handler = move || {
312                    // We receive on the thread so that we don't block the reader thread
313                    let mut context = context_rx
314                        .recv()
315                        .ok() // The plugin call won't send context if it's not required.
316                        .map(|c| c.0);
317
318                    for msg in rx {
319                        // This thread only handles engine calls.
320                        match msg {
321                            ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
322                                if let Err(err) = interface.handle_engine_call(
323                                    engine_call_id,
324                                    engine_call,
325                                    &mut current_call_state,
326                                    context.as_deref_mut(),
327                                ) {
328                                    log::warn!(
329                                        "Error in plugin post-response engine call handler: \
330                                        {err:?}"
331                                    );
332                                    return;
333                                }
334                            }
335                            other => log::warn!(
336                                "Bad message received in plugin post-response \
337                                engine call handler: {other:?}"
338                            ),
339                        }
340                    }
341                };
342                std::thread::Builder::new()
343                    .name("plugin engine call handler".into())
344                    .spawn(handler)
345                    .expect("failed to spawn thread");
346                state.sender = Some(tx);
347                Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
348            } else {
349                Err(ShellError::NushellFailed {
350                    msg: "Tried to spawn the fallback engine call handler before the plugin call \
351                        response had been received"
352                        .into(),
353                })
354            }
355        } else {
356            Err(ShellError::NushellFailed {
357                msg: format!("Couldn't find plugin ID={id} in subscriptions"),
358            })
359        }
360    }
361
362    /// Send an [`EngineCall`] to the appropriate sender
363    fn send_engine_call(
364        &mut self,
365        plugin_call_id: PluginCallId,
366        engine_call_id: EngineCallId,
367        call: EngineCall<PipelineData>,
368    ) -> Result<(), ShellError> {
369        // Ensure we're caught up on the subscriptions made
370        self.receive_plugin_call_subscriptions();
371
372        // Don't remove the sender, as there could be more calls or responses
373        if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
374            let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
375            // Call if there's an error sending the engine call
376            let send_error = |this: &Self| {
377                log::warn!(
378                    "Received an engine call for plugin_call_id={plugin_call_id}, \
379                    but the caller hung up"
380                );
381                // We really have no choice here but to send the response ourselves and hope we
382                // don't block
383                this.state.writer.write(&PluginInput::EngineCallResponse(
384                    engine_call_id,
385                    EngineCallResponse::Error(ShellError::GenericError {
386                        error: "Caller hung up".to_string(),
387                        msg: "Can't make engine call because the original caller hung up"
388                            .to_string(),
389                        span: None,
390                        help: None,
391                        inner: vec![],
392                    }),
393                ))?;
394                this.state.writer.flush()
395            };
396            // Try to send to the sender if it exists
397            if let Some(sender) = subscription.sender.as_ref() {
398                sender.send(msg).or_else(|_| send_error(self))
399            } else {
400                // The sender no longer exists. Spawn a specific one just for engine calls
401                let sender = self.spawn_engine_call_handler(plugin_call_id)?;
402                sender.send(msg).or_else(|_| send_error(self))
403            }
404        } else {
405            Err(ShellError::PluginFailedToDecode {
406                msg: format!("Unknown plugin call ID: {plugin_call_id}"),
407            })
408        }
409    }
410
411    /// True if there are no other copies of the state (which would mean there are no interfaces
412    /// and no stream readers/writers)
413    pub fn is_finished(&self) -> bool {
414        Arc::strong_count(&self.state) < 2
415    }
416
417    /// Loop on input from the given reader as long as `is_finished()` is false
418    ///
419    /// Any errors will be propagated to all read streams automatically.
420    pub fn consume_all(
421        &mut self,
422        mut reader: impl PluginRead<PluginOutput>,
423    ) -> Result<(), ShellError> {
424        let mut result = Ok(());
425
426        while let Some(msg) = reader.read().transpose() {
427            if self.is_finished() {
428                break;
429            }
430
431            // We assume an error here is unrecoverable (at least, without restarting the plugin)
432            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
433                // Put the error in the state so that new calls see it
434                let _ = self.state.error.set(err.clone());
435                // Error to streams
436                let _ = self.stream_manager.broadcast_read_error(err.clone());
437                // Error to call waiters
438                self.receive_plugin_call_subscriptions();
439                for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
440                    let _ = subscription
441                        .sender
442                        .as_ref()
443                        .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
444                }
445                result = Err(err);
446                break;
447            }
448        }
449
450        // Tell the GC we are exiting so that the plugin doesn't get stuck open
451        if let Some(ref gc) = self.gc {
452            gc.exited();
453        }
454        result
455    }
456}
457
458impl InterfaceManager for PluginInterfaceManager {
459    type Interface = PluginInterface;
460    type Input = PluginOutput;
461
462    fn get_interface(&self) -> Self::Interface {
463        PluginInterface {
464            state: self.state.clone(),
465            stream_manager_handle: self.stream_manager.get_handle(),
466            gc: self.gc.clone(),
467        }
468    }
469
470    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
471        log::trace!("from plugin: {input:?}");
472
473        match input {
474            PluginOutput::Hello(info) => {
475                let info = Arc::new(info);
476                self.protocol_info_mut.set(info.clone())?;
477
478                let local_info = ProtocolInfo::default();
479                if local_info.is_compatible_with(&info)? {
480                    Ok(())
481                } else {
482                    Err(ShellError::PluginFailedToLoad {
483                        msg: format!(
484                            "Plugin `{}` is compiled for nushell version {}, \
485                                which is not compatible with version {}",
486                            self.state.source.name(),
487                            info.version,
488                            local_info.version,
489                        ),
490                    })
491                }
492            }
493            _ if !self.state.protocol_info.is_set() => {
494                // Must send protocol info first
495                Err(ShellError::PluginFailedToLoad {
496                    msg: format!(
497                        "Failed to receive initial Hello message from `{}`. \
498                            This plugin might be too old",
499                        self.state.source.name()
500                    ),
501                })
502            }
503            // Stream messages
504            PluginOutput::Data(..)
505            | PluginOutput::End(..)
506            | PluginOutput::Drop(..)
507            | PluginOutput::Ack(..) => {
508                self.consume_stream_message(input.try_into().map_err(|msg| {
509                    ShellError::NushellFailed {
510                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
511                    }
512                })?)
513            }
514            PluginOutput::Option(option) => match option {
515                PluginOption::GcDisabled(disabled) => {
516                    // Turn garbage collection off/on.
517                    if let Some(ref gc) = self.gc {
518                        gc.set_disabled(disabled);
519                    }
520                    Ok(())
521                }
522            },
523            PluginOutput::CallResponse(id, response) => {
524                // Handle reading the pipeline data, if any
525                let response = response
526                    .map_data(|data| {
527                        let signals = self.get_signals(id)?;
528
529                        // Register the stream in the response
530                        if let Some(stream_id) = data.stream_id() {
531                            self.recv_stream_started(id, stream_id);
532                        }
533
534                        self.read_pipeline_data(data, &signals)
535                    })
536                    .unwrap_or_else(|err| {
537                        // If there's an error with initializing this stream, change it to a plugin
538                        // error response, but send it anyway
539                        PluginCallResponse::Error(err.into())
540                    });
541                let result = self.send_plugin_call_response(id, response);
542                if result.is_ok() {
543                    // When a call ends, it releases a lock on the GC
544                    if let Some(ref gc) = self.gc {
545                        gc.decrement_locks(1);
546                    }
547                }
548                result
549            }
550            PluginOutput::EngineCall { context, id, call } => {
551                let call = call
552                    // Handle reading the pipeline data, if any
553                    .map_data(|input| {
554                        let signals = self.get_signals(context)?;
555                        self.read_pipeline_data(input, &signals)
556                    })
557                    // Do anything extra needed for each engine call setup
558                    .and_then(|mut engine_call| {
559                        match engine_call {
560                            EngineCall::EvalClosure {
561                                ref mut positional, ..
562                            } => {
563                                for arg in positional.iter_mut() {
564                                    // Add source to any plugin custom values in the arguments
565                                    PluginCustomValueWithSource::add_source_in(
566                                        arg,
567                                        &self.state.source,
568                                    )?;
569                                }
570                                Ok(engine_call)
571                            }
572                            _ => Ok(engine_call),
573                        }
574                    });
575                match call {
576                    Ok(call) => self.send_engine_call(context, id, call),
577                    // If there was an error with setting up the call, just write the error
578                    Err(err) => self.get_interface().write_engine_call_response(
579                        id,
580                        EngineCallResponse::Error(err),
581                        &CurrentCallState::default(),
582                    ),
583                }
584            }
585        }
586    }
587
588    fn stream_manager(&self) -> &StreamManager {
589        &self.stream_manager
590    }
591
592    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
593        // Add source to any values
594        match data {
595            PipelineData::Value(ref mut value, _) => {
596                with_custom_values_in(value, |custom_value| {
597                    PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
598                    Ok::<_, ShellError>(())
599                })?;
600                Ok(data)
601            }
602            PipelineData::ListStream(stream, meta) => {
603                let source = self.state.source.clone();
604                Ok(PipelineData::list_stream(
605                    stream.map(move |mut value| {
606                        let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
607                        value
608                    }),
609                    meta,
610                ))
611            }
612            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
613        }
614    }
615
616    fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
617        // Keep track of streams that end
618        if let StreamMessage::End(id) = message {
619            self.recv_stream_ended(id);
620        }
621        self.stream_manager.handle_message(message)
622    }
623}
624
625/// A reference through which a plugin can be interacted with during execution.
626///
627/// This is not a public API.
628#[derive(Debug, Clone)]
629#[doc(hidden)]
630pub struct PluginInterface {
631    /// Shared state
632    state: Arc<PluginInterfaceState>,
633    /// Handle to stream manager
634    stream_manager_handle: StreamManagerHandle,
635    /// Handle to plugin garbage collector
636    gc: Option<PluginGc>,
637}
638
639impl PluginInterface {
640    /// Get the process ID for the plugin, if known.
641    pub fn pid(&self) -> Option<u32> {
642        self.state.process.as_ref().map(|p| p.pid())
643    }
644
645    /// Get the protocol info for the plugin. Will block to receive `Hello` if not received yet.
646    pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
647        self.state.protocol_info.get().and_then(|info| {
648            info.ok_or_else(|| ShellError::PluginFailedToLoad {
649                msg: format!(
650                    "Failed to get protocol info (`Hello` message) from the `{}` plugin",
651                    self.state.source.identity.name()
652                ),
653            })
654        })
655    }
656
657    /// Write the protocol info. This should be done after initialization
658    pub fn hello(&self) -> Result<(), ShellError> {
659        self.write(PluginInput::Hello(ProtocolInfo::default()))?;
660        self.flush()
661    }
662
663    /// Tell the plugin it should not expect any more plugin calls and should terminate after it has
664    /// finished processing the ones it has already received.
665    ///
666    /// Note that this is automatically called when the last existing `PluginInterface` is dropped.
667    /// You probably do not need to call this manually.
668    pub fn goodbye(&self) -> Result<(), ShellError> {
669        self.write(PluginInput::Goodbye)?;
670        self.flush()
671    }
672
673    /// Send the plugin a signal.
674    pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
675        self.write(PluginInput::Signal(action))?;
676        self.flush()
677    }
678
679    /// Write an [`EngineCallResponse`]. Writes the full stream contained in any [`PipelineData`]
680    /// before returning.
681    pub fn write_engine_call_response(
682        &self,
683        id: EngineCallId,
684        response: EngineCallResponse<PipelineData>,
685        state: &CurrentCallState,
686    ) -> Result<(), ShellError> {
687        // Set up any stream if necessary
688        let mut writer = None;
689        let response = response.map_data(|data| {
690            let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
691            writer = Some(data_writer);
692            Ok(data_header)
693        })?;
694
695        // Write the response, including the pipeline data header if present
696        self.write(PluginInput::EngineCallResponse(id, response))?;
697        self.flush()?;
698
699        // If we have a stream to write, do it now
700        if let Some(writer) = writer {
701            writer.write_background()?;
702        }
703
704        Ok(())
705    }
706
707    /// Write a plugin call message. Returns the writer for the stream.
708    fn write_plugin_call(
709        &self,
710        mut call: PluginCall<PipelineData>,
711        context: Option<&dyn PluginExecutionContext>,
712    ) -> Result<WritePluginCallResult, ShellError> {
713        let id = self.state.plugin_call_id_sequence.next()?;
714        let signals = context
715            .map(|c| c.signals().clone())
716            .unwrap_or_else(Signals::empty);
717        let (tx, rx) = mpsc::channel();
718        let (context_tx, context_rx) = mpsc::channel();
719        let keep_plugin_custom_values = mpsc::channel();
720
721        // Set up the state that will stay alive during the call.
722        let state = CurrentCallState {
723            context_tx: Some(context_tx),
724            keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
725            entered_foreground: false,
726            span: call.span(),
727        };
728
729        // Prepare the call with the state.
730        state.prepare_plugin_call(&mut call, &self.state.source)?;
731
732        // Convert the call into one with a header and handle the stream, if necessary
733        let (call, writer) = match call {
734            PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
735            PluginCall::Signature => (PluginCall::Signature, Default::default()),
736            PluginCall::CustomValueOp(value, op) => {
737                (PluginCall::CustomValueOp(value, op), Default::default())
738            }
739            PluginCall::Run(CallInfo { name, call, input }) => {
740                let (header, writer) = self.init_write_pipeline_data(input, &state)?;
741                (
742                    PluginCall::Run(CallInfo {
743                        name,
744                        call,
745                        input: header,
746                    }),
747                    writer,
748                )
749            }
750        };
751
752        // Don't try to send a response for a Dropped call.
753        let dont_send_response =
754            matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
755
756        // Register the subscription to the response, and the context
757        self.state
758            .plugin_call_subscription_sender
759            .send((
760                id,
761                PluginCallState {
762                    sender: Some(tx).filter(|_| !dont_send_response),
763                    dont_send_response,
764                    signals,
765                    context_rx: Some(context_rx),
766                    span: call.span(),
767                    keep_plugin_custom_values,
768                    remaining_streams_to_read: 0,
769                },
770            ))
771            .map_err(|_| {
772                let existing_error = self.state.error.get().cloned();
773                ShellError::GenericError {
774                    error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
775                    msg: "can't complete this operation because the plugin is closed".into(),
776                    span: call.span(),
777                    help: Some(format!(
778                        "the plugin may have experienced an error. Try loading the plugin again \
779                        with `{}`",
780                        self.state.source.identity.use_command(),
781                    )),
782                    inner: existing_error.into_iter().collect(),
783                }
784            })?;
785
786        // Starting a plugin call adds a lock on the GC. Locks are not added for streams being read
787        // by the plugin, so the plugin would have to explicitly tell us if it expects to stay alive
788        // while reading streams in the background after the response ends.
789        if let Some(ref gc) = self.gc {
790            gc.increment_locks(1);
791        }
792
793        // Write request
794        self.write(PluginInput::Call(id, call))?;
795        self.flush()?;
796
797        Ok(WritePluginCallResult {
798            receiver: rx,
799            writer,
800            state,
801        })
802    }
803
804    /// Read the channel for plugin call messages and handle them until the response is received.
805    fn receive_plugin_call_response(
806        &self,
807        rx: mpsc::Receiver<ReceivedPluginCallMessage>,
808        mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
809        mut state: CurrentCallState,
810    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
811        // Handle message from receiver
812        for msg in rx {
813            match msg {
814                ReceivedPluginCallMessage::Response(resp) => {
815                    if state.entered_foreground {
816                        // Make the plugin leave the foreground on return, even if it's a stream
817                        if let Some(context) = context.as_deref_mut()
818                            && let Err(err) =
819                                set_foreground(self.state.process.as_ref(), context, false)
820                        {
821                            log::warn!("Failed to leave foreground state on exit: {err:?}");
822                        }
823                    }
824                    if resp.has_stream() {
825                        // If the response has a stream, we need to register the context
826                        if let Some(context) = context
827                            && let Some(ref context_tx) = state.context_tx
828                        {
829                            let _ = context_tx.send(Context(context.boxed()));
830                        }
831                    }
832                    return Ok(resp);
833                }
834                ReceivedPluginCallMessage::Error(err) => {
835                    return Err(err);
836                }
837                ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
838                    self.handle_engine_call(
839                        engine_call_id,
840                        engine_call,
841                        &mut state,
842                        context.as_deref_mut(),
843                    )?;
844                }
845            }
846        }
847        // If we fail to get a response, check for an error in the state first, and return it if
848        // set. This is probably a much more helpful error than 'failed to receive response' alone
849        let existing_error = self.state.error.get().cloned();
850        Err(ShellError::GenericError {
851            error: format!(
852                "Failed to receive response to plugin call from `{}`",
853                self.state.source.identity.name()
854            ),
855            msg: "while waiting for this operation to complete".into(),
856            span: state.span,
857            help: Some(format!(
858                "try restarting the plugin with `{}`",
859                self.state.source.identity.use_command()
860            )),
861            inner: existing_error.into_iter().collect(),
862        })
863    }
864
865    /// Handle an engine call and write the response.
866    fn handle_engine_call(
867        &self,
868        engine_call_id: EngineCallId,
869        engine_call: EngineCall<PipelineData>,
870        state: &mut CurrentCallState,
871        context: Option<&mut (dyn PluginExecutionContext + '_)>,
872    ) -> Result<(), ShellError> {
873        let process = self.state.process.as_ref();
874        let resp = handle_engine_call(engine_call, state, context, process)
875            .unwrap_or_else(EngineCallResponse::Error);
876        // Handle stream
877        let mut writer = None;
878        let resp = resp
879            .map_data(|data| {
880                let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
881                writer = Some(data_writer);
882                Ok(data_header)
883            })
884            .unwrap_or_else(|err| {
885                // If we fail to set up the response write, change to an error response here
886                writer = None;
887                EngineCallResponse::Error(err)
888            });
889        // Write the response, then the stream
890        self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
891        self.flush()?;
892        if let Some(writer) = writer {
893            writer.write_background()?;
894        }
895        Ok(())
896    }
897
898    /// Perform a plugin call. Input and output streams are handled, and engine calls are handled
899    /// too if there are any before the final response.
900    fn plugin_call(
901        &self,
902        call: PluginCall<PipelineData>,
903        context: Option<&mut dyn PluginExecutionContext>,
904    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
905        // Check for an error in the state first, and return it if set.
906        if let Some(error) = self.state.error.get() {
907            return Err(ShellError::GenericError {
908                error: format!(
909                    "Failed to send plugin call to `{}`",
910                    self.state.source.identity.name()
911                ),
912                msg: "the plugin encountered an error before this operation could be attempted"
913                    .into(),
914                span: call.span(),
915                help: Some(format!(
916                    "try loading the plugin again with `{}`",
917                    self.state.source.identity.use_command(),
918                )),
919                inner: vec![error.clone()],
920            });
921        }
922
923        let result = self.write_plugin_call(call, context.as_deref())?;
924
925        // Finish writing stream in the background
926        result.writer.write_background()?;
927
928        self.receive_plugin_call_response(result.receiver, context, result.state)
929    }
930
931    /// Get the metadata from the plugin.
932    pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
933        match self.plugin_call(PluginCall::Metadata, None)? {
934            PluginCallResponse::Metadata(meta) => Ok(meta),
935            PluginCallResponse::Error(err) => Err(err.into()),
936            _ => Err(ShellError::PluginFailedToDecode {
937                msg: "Received unexpected response to plugin Metadata call".into(),
938            }),
939        }
940    }
941
942    /// Get the command signatures from the plugin.
943    pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
944        match self.plugin_call(PluginCall::Signature, None)? {
945            PluginCallResponse::Signature(sigs) => Ok(sigs),
946            PluginCallResponse::Error(err) => Err(err.into()),
947            _ => Err(ShellError::PluginFailedToDecode {
948                msg: "Received unexpected response to plugin Signature call".into(),
949            }),
950        }
951    }
952
953    /// Run the plugin with the given call and execution context.
954    pub fn run(
955        &self,
956        call: CallInfo<PipelineData>,
957        context: &mut dyn PluginExecutionContext,
958    ) -> Result<PipelineData, ShellError> {
959        match self.plugin_call(PluginCall::Run(call), Some(context))? {
960            PluginCallResponse::PipelineData(data) => Ok(data),
961            PluginCallResponse::Error(err) => Err(err.into()),
962            _ => Err(ShellError::PluginFailedToDecode {
963                msg: "Received unexpected response to plugin Run call".into(),
964            }),
965        }
966    }
967
968    /// Do a custom value op that expects a value response (i.e. most of them)
969    fn custom_value_op_expecting_value(
970        &self,
971        value: Spanned<PluginCustomValueWithSource>,
972        op: CustomValueOp,
973    ) -> Result<Value, ShellError> {
974        let op_name = op.name();
975        let span = value.span;
976
977        // Check that the value came from the right source
978        value.item.verify_source(span, &self.state.source)?;
979
980        let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
981        match self.plugin_call(call, None)? {
982            PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
983            PluginCallResponse::Error(err) => Err(err.into()),
984            _ => Err(ShellError::PluginFailedToDecode {
985                msg: format!("Received unexpected response to custom value {op_name}() call"),
986            }),
987        }
988    }
989
990    /// Collapse a custom value to its base value.
991    pub fn custom_value_to_base_value(
992        &self,
993        value: Spanned<PluginCustomValueWithSource>,
994    ) -> Result<Value, ShellError> {
995        self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
996    }
997
998    /// Follow a numbered cell path on a custom value - e.g. `value.0`.
999    pub fn custom_value_follow_path_int(
1000        &self,
1001        value: Spanned<PluginCustomValueWithSource>,
1002        index: Spanned<usize>,
1003        optional: bool,
1004    ) -> Result<Value, ShellError> {
1005        self.custom_value_op_expecting_value(
1006            value,
1007            CustomValueOp::FollowPathInt { index, optional },
1008        )
1009    }
1010
1011    /// Follow a named cell path on a custom value - e.g. `value.column`.
1012    pub fn custom_value_follow_path_string(
1013        &self,
1014        value: Spanned<PluginCustomValueWithSource>,
1015        column_name: Spanned<String>,
1016        optional: bool,
1017        casing: Casing,
1018    ) -> Result<Value, ShellError> {
1019        self.custom_value_op_expecting_value(
1020            value,
1021            CustomValueOp::FollowPathString {
1022                column_name,
1023                optional,
1024                casing,
1025            },
1026        )
1027    }
1028
1029    /// Invoke comparison logic for custom values.
1030    pub fn custom_value_partial_cmp(
1031        &self,
1032        value: PluginCustomValueWithSource,
1033        other_value: Value,
1034    ) -> Result<Option<Ordering>, ShellError> {
1035        // Check that the value came from the right source
1036        value.verify_source(Span::unknown(), &self.state.source)?;
1037
1038        // Note: the protocol is always designed to have a span with the custom value, but this
1039        // operation doesn't support one.
1040        let call = PluginCall::CustomValueOp(
1041            value.without_source().into_spanned(Span::unknown()),
1042            CustomValueOp::PartialCmp(other_value),
1043        );
1044        match self.plugin_call(call, None)? {
1045            PluginCallResponse::Ordering(ordering) => Ok(ordering),
1046            PluginCallResponse::Error(err) => Err(err.into()),
1047            _ => Err(ShellError::PluginFailedToDecode {
1048                msg: "Received unexpected response to custom value partial_cmp() call".into(),
1049            }),
1050        }
1051    }
1052
1053    /// Invoke functionality for an operator on a custom value.
1054    pub fn custom_value_operation(
1055        &self,
1056        left: Spanned<PluginCustomValueWithSource>,
1057        operator: Spanned<Operator>,
1058        right: Value,
1059    ) -> Result<Value, ShellError> {
1060        self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1061    }
1062
1063    /// Invoke saving operation on a custom value.
1064    pub fn custom_value_save(
1065        &self,
1066        value: Spanned<PluginCustomValueWithSource>,
1067        path: Spanned<&Path>,
1068        save_call_span: Span,
1069    ) -> Result<(), ShellError> {
1070        // Check that the value came from the right source
1071        value.item.verify_source(value.span, &self.state.source)?;
1072
1073        let call = PluginCall::CustomValueOp(
1074            value.map(|cv| cv.without_source()),
1075            CustomValueOp::Save {
1076                path: path.map(ToOwned::to_owned),
1077                save_call_span,
1078            },
1079        );
1080        match self.plugin_call(call, None)? {
1081            PluginCallResponse::Ok => Ok(()),
1082            PluginCallResponse::Error(err) => Err(err.into()),
1083            _ => Err(ShellError::PluginFailedToDecode {
1084                msg: "Received unexpected response to custom value save() call".into(),
1085            }),
1086        }
1087    }
1088
1089    /// Notify the plugin about a dropped custom value.
1090    pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1091        // Make sure we don't block here. This can happen on the receiver thread, which would cause a deadlock. We should not try to receive the response - just let it be discarded.
1092        //
1093        // Note: the protocol is always designed to have a span with the custom value, but this
1094        // operation doesn't support one.
1095        drop(self.write_plugin_call(
1096            PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1097            None,
1098        )?);
1099        Ok(())
1100    }
1101}
1102
1103impl Interface for PluginInterface {
1104    type Output = PluginInput;
1105    type DataContext = CurrentCallState;
1106
1107    fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1108        log::trace!("to plugin: {input:?}");
1109        self.state.writer.write(&input).map_err(|err| {
1110            log::warn!("write() error: {err}");
1111            // If there's an error in the state, return that instead because it's likely more
1112            // descriptive
1113            self.state.error.get().cloned().unwrap_or(err)
1114        })
1115    }
1116
1117    fn flush(&self) -> Result<(), ShellError> {
1118        self.state.writer.flush().map_err(|err| {
1119            log::warn!("flush() error: {err}");
1120            // If there's an error in the state, return that instead because it's likely more
1121            // descriptive
1122            self.state.error.get().cloned().unwrap_or(err)
1123        })
1124    }
1125
1126    fn stream_id_sequence(&self) -> &Sequence {
1127        &self.state.stream_id_sequence
1128    }
1129
1130    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1131        &self.stream_manager_handle
1132    }
1133
1134    fn prepare_pipeline_data(
1135        &self,
1136        data: PipelineData,
1137        state: &CurrentCallState,
1138    ) -> Result<PipelineData, ShellError> {
1139        // Validate the destination of values in the pipeline data
1140        match data {
1141            PipelineData::Value(mut value, meta) => {
1142                state.prepare_value(&mut value, &self.state.source)?;
1143                Ok(PipelineData::value(value, meta))
1144            }
1145            PipelineData::ListStream(stream, meta) => {
1146                let source = self.state.source.clone();
1147                let state = state.clone();
1148                Ok(PipelineData::list_stream(
1149                    stream.map(move |mut value| {
1150                        match state.prepare_value(&mut value, &source) {
1151                            Ok(()) => value,
1152                            // Put the error in the stream instead
1153                            Err(err) => Value::error(err, value.span()),
1154                        }
1155                    }),
1156                    meta,
1157                ))
1158            }
1159            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1160        }
1161    }
1162}
1163
1164impl Drop for PluginInterface {
1165    fn drop(&mut self) {
1166        // Automatically send `Goodbye` if there are no more interfaces. In that case there would be
1167        // only two copies of the state, one of which we hold, and one of which the manager holds.
1168        //
1169        // Our copy is about to be dropped, so there would only be one left, the manager. The
1170        // manager will never send any plugin calls, so we should let the plugin know that.
1171        if Arc::strong_count(&self.state) < 3
1172            && let Err(err) = self.goodbye()
1173        {
1174            log::warn!("Error during plugin Goodbye: {err}");
1175        }
1176    }
1177}
1178
1179/// Return value of [`PluginInterface::write_plugin_call()`].
1180#[must_use]
1181struct WritePluginCallResult {
1182    /// Receiver for plugin call messages related to the written plugin call.
1183    receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1184    /// Writer for the stream, if any.
1185    writer: PipelineDataWriter<PluginInterface>,
1186    /// State to be kept for the duration of the plugin call.
1187    state: CurrentCallState,
1188}
1189
1190/// State related to the current plugin call being executed.
1191#[derive(Default, Clone)]
1192pub struct CurrentCallState {
1193    /// Sender for context, which should be sent if the plugin call returned a stream so that
1194    /// engine calls may continue to be handled.
1195    context_tx: Option<mpsc::Sender<Context>>,
1196    /// Sender for a channel that retains plugin custom values that need to stay alive for the
1197    /// duration of a plugin call.
1198    keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1199    /// The plugin call entered the foreground: this should be cleaned up automatically when the
1200    /// plugin call returns.
1201    entered_foreground: bool,
1202    /// The span that caused the plugin call.
1203    span: Option<Span>,
1204}
1205
1206impl CurrentCallState {
1207    /// Prepare a custom value for write. Verifies custom value origin, and keeps custom values that
1208    /// shouldn't be dropped immediately.
1209    fn prepare_custom_value(
1210        &self,
1211        custom_value: Spanned<&mut Box<dyn CustomValue>>,
1212        source: &PluginSource,
1213    ) -> Result<(), ShellError> {
1214        // Ensure we can use it
1215        PluginCustomValueWithSource::verify_source_of_custom_value(
1216            custom_value.as_deref().map(|cv| &**cv),
1217            source,
1218        )?;
1219
1220        // Check whether we need to keep it
1221        if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1222            && let Some(custom_value) = custom_value
1223                .item
1224                .as_any()
1225                .downcast_ref::<PluginCustomValueWithSource>()
1226            && custom_value.notify_on_drop()
1227        {
1228            log::trace!("Keeping custom value for drop later: {custom_value:?}");
1229            keep_tx
1230                .send(custom_value.clone())
1231                .map_err(|_| ShellError::NushellFailed {
1232                    msg: "Failed to custom value to keep channel".into(),
1233                })?;
1234        }
1235
1236        // Strip the source from it so it can be serialized
1237        PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1238
1239        Ok(())
1240    }
1241
1242    /// Prepare a value for write, including all contained custom values.
1243    fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1244        with_custom_values_in(value, |custom_value| {
1245            self.prepare_custom_value(custom_value, source)
1246        })
1247    }
1248
1249    /// Prepare call arguments for write.
1250    fn prepare_call_args(
1251        &self,
1252        call: &mut EvaluatedCall,
1253        source: &PluginSource,
1254    ) -> Result<(), ShellError> {
1255        for arg in call.positional.iter_mut() {
1256            self.prepare_value(arg, source)?;
1257        }
1258        for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1259            self.prepare_value(arg, source)?;
1260        }
1261        Ok(())
1262    }
1263
1264    /// Prepare a plugin call for write. Does not affect pipeline data, which is handled by
1265    /// `prepare_pipeline_data()` instead.
1266    fn prepare_plugin_call<D>(
1267        &self,
1268        call: &mut PluginCall<D>,
1269        source: &PluginSource,
1270    ) -> Result<(), ShellError> {
1271        match call {
1272            PluginCall::Metadata => Ok(()),
1273            PluginCall::Signature => Ok(()),
1274            PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1275            PluginCall::CustomValueOp(_, op) => {
1276                // Handle anything within the op.
1277                match op {
1278                    CustomValueOp::ToBaseValue => Ok(()),
1279                    CustomValueOp::FollowPathInt { .. } => Ok(()),
1280                    CustomValueOp::FollowPathString { .. } => Ok(()),
1281                    CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1282                    CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1283                    CustomValueOp::Save { .. } => Ok(()),
1284                    CustomValueOp::Dropped => Ok(()),
1285                }
1286            }
1287        }
1288    }
1289}
1290
1291/// Handle an engine call.
1292pub(crate) fn handle_engine_call(
1293    call: EngineCall<PipelineData>,
1294    state: &mut CurrentCallState,
1295    context: Option<&mut (dyn PluginExecutionContext + '_)>,
1296    process: Option<&PluginProcess>,
1297) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1298    let call_name = call.name();
1299
1300    let context = context.ok_or_else(|| ShellError::GenericError {
1301        error: "A plugin execution context is required for this engine call".into(),
1302        msg: format!("attempted to call {call_name} outside of a command invocation"),
1303        span: None,
1304        help: Some("this is probably a bug with the plugin".into()),
1305        inner: vec![],
1306    })?;
1307
1308    match call {
1309        EngineCall::GetConfig => {
1310            let config = SharedCow::from(context.get_config()?);
1311            Ok(EngineCallResponse::Config(config))
1312        }
1313        EngineCall::GetPluginConfig => {
1314            let plugin_config = context.get_plugin_config()?;
1315            Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1316        }
1317        EngineCall::GetEnvVar(name) => {
1318            let value = context.get_env_var(&name)?;
1319            Ok(value
1320                .cloned()
1321                .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1322        }
1323        EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1324        EngineCall::GetCurrentDir => {
1325            let current_dir = context.get_current_dir()?;
1326            Ok(EngineCallResponse::value(Value::string(
1327                current_dir.item,
1328                current_dir.span,
1329            )))
1330        }
1331        EngineCall::AddEnvVar(name, value) => {
1332            context.add_env_var(name, value)?;
1333            Ok(EngineCallResponse::empty())
1334        }
1335        EngineCall::GetHelp => {
1336            let help = context.get_help()?;
1337            Ok(EngineCallResponse::value(Value::string(
1338                help.item, help.span,
1339            )))
1340        }
1341        EngineCall::EnterForeground => {
1342            let resp = set_foreground(process, context, true)?;
1343            state.entered_foreground = true;
1344            Ok(resp)
1345        }
1346        EngineCall::LeaveForeground => {
1347            let resp = set_foreground(process, context, false)?;
1348            state.entered_foreground = false;
1349            Ok(resp)
1350        }
1351        EngineCall::GetSpanContents(span) => {
1352            let contents = context.get_span_contents(span)?;
1353            Ok(EngineCallResponse::value(Value::binary(
1354                contents.item,
1355                contents.span,
1356            )))
1357        }
1358        EngineCall::EvalClosure {
1359            closure,
1360            positional,
1361            input,
1362            redirect_stdout,
1363            redirect_stderr,
1364        } => context
1365            .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1366            .map(EngineCallResponse::PipelineData),
1367        EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1368            if let Some(decl_id) = decl_id {
1369                EngineCallResponse::Identifier(decl_id)
1370            } else {
1371                EngineCallResponse::empty()
1372            }
1373        }),
1374        EngineCall::CallDecl {
1375            decl_id,
1376            call,
1377            input,
1378            redirect_stdout,
1379            redirect_stderr,
1380        } => context
1381            .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1382            .map(EngineCallResponse::PipelineData),
1383    }
1384}
1385
1386/// Implements enter/exit foreground
1387fn set_foreground(
1388    process: Option<&PluginProcess>,
1389    context: &mut dyn PluginExecutionContext,
1390    enter: bool,
1391) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1392    if let Some(process) = process {
1393        if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1394            if enter {
1395                let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1396                Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1397                    EngineCallResponse::value(Value::int(id as i64, context.span()))
1398                }))
1399            } else {
1400                process.exit_foreground()?;
1401                Ok(EngineCallResponse::empty())
1402            }
1403        } else {
1404            // This should always be present on a real context
1405            Err(ShellError::NushellFailed {
1406                msg: "missing required pipeline_externals_state from context \
1407                            for entering foreground"
1408                    .into(),
1409            })
1410        }
1411    } else {
1412        Err(ShellError::GenericError {
1413            error: "Can't manage plugin process to enter foreground".into(),
1414            msg: "the process ID for this plugin is unknown".into(),
1415            span: Some(context.span()),
1416            help: Some("the plugin may be running in a test".into()),
1417            inner: vec![],
1418        })
1419    }
1420}