nu_plugin_engine/interface/
mod.rs

1//! Interface used by the engine to communicate with the plugin.
2
3use nu_plugin_core::{
4    Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5    StreamManagerHandle,
6    util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9    CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10    PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11    PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14    CustomValue, IntoSpanned, PipelineData, PluginMetadata, PluginSignature, ShellError,
15    SignalAction, Signals, Span, Spanned, Value, ast::Operator, engine::Sequence,
16};
17use nu_utils::SharedCow;
18use std::{
19    collections::{BTreeMap, btree_map},
20    sync::{Arc, OnceLock, mpsc},
21};
22
23use crate::{
24    PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
25    process::PluginProcess,
26};
27
28#[cfg(test)]
29mod tests;
30
31#[derive(Debug)]
32enum ReceivedPluginCallMessage {
33    /// The final response to send
34    Response(PluginCallResponse<PipelineData>),
35
36    /// An critical error with the interface
37    Error(ShellError),
38
39    /// An engine call that should be evaluated and responded to, but is not the final response
40    ///
41    /// We send this back to the thread that made the plugin call so we don't block the reader
42    /// thread
43    EngineCall(EngineCallId, EngineCall<PipelineData>),
44}
45
46/// Context for plugin call execution
47pub(crate) struct Context(Box<dyn PluginExecutionContext>);
48
49impl std::fmt::Debug for Context {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.write_str("Context")
52    }
53}
54
55impl std::ops::Deref for Context {
56    type Target = dyn PluginExecutionContext;
57
58    fn deref(&self) -> &Self::Target {
59        &*self.0
60    }
61}
62
63/// Internal shared state between the manager and each interface.
64struct PluginInterfaceState {
65    /// The source to be used for custom values coming from / going to the plugin
66    source: Arc<PluginSource>,
67    /// The plugin process being managed
68    process: Option<PluginProcess>,
69    /// Protocol version info, set after `Hello` received
70    protocol_info: Waitable<Arc<ProtocolInfo>>,
71    /// Sequence for generating plugin call ids
72    plugin_call_id_sequence: Sequence,
73    /// Sequence for generating stream ids
74    stream_id_sequence: Sequence,
75    /// Sender to subscribe to a plugin call response
76    plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
77    /// An error that should be propagated to further plugin calls
78    error: OnceLock<ShellError>,
79    /// The synchronized output writer
80    writer: Box<dyn PluginWrite<PluginInput>>,
81}
82
83impl std::fmt::Debug for PluginInterfaceState {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("PluginInterfaceState")
86            .field("source", &self.source)
87            .field("protocol_info", &self.protocol_info)
88            .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
89            .field("stream_id_sequence", &self.stream_id_sequence)
90            .field(
91                "plugin_call_subscription_sender",
92                &self.plugin_call_subscription_sender,
93            )
94            .field("error", &self.error)
95            .finish_non_exhaustive()
96    }
97}
98
99/// State that the manager keeps for each plugin call during its lifetime.
100#[derive(Debug)]
101struct PluginCallState {
102    /// The sender back to the thread that is waiting for the plugin call response
103    sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
104    /// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an
105    /// error
106    dont_send_response: bool,
107    /// Signals to be used for stream iterators
108    signals: Signals,
109    /// Channel to receive context on to be used if needed
110    context_rx: Option<mpsc::Receiver<Context>>,
111    /// Span associated with the call, if any
112    span: Option<Span>,
113    /// Channel for plugin custom values that should be kept alive for the duration of the plugin
114    /// call. The plugin custom values on this channel are never read, we just hold on to it to keep
115    /// them in memory so they can be dropped at the end of the call. We hold the sender as well so
116    /// we can generate the CurrentCallState.
117    keep_plugin_custom_values: (
118        mpsc::Sender<PluginCustomValueWithSource>,
119        mpsc::Receiver<PluginCustomValueWithSource>,
120    ),
121    /// Number of streams that still need to be read from the plugin call response
122    remaining_streams_to_read: i32,
123}
124
125impl Drop for PluginCallState {
126    fn drop(&mut self) {
127        // Clear the keep custom values channel, so drop notifications can be sent
128        for value in self.keep_plugin_custom_values.1.try_iter() {
129            log::trace!("Dropping custom value that was kept: {value:?}");
130            drop(value);
131        }
132    }
133}
134
135/// Manages reading and dispatching messages for [`PluginInterface`]s.
136#[derive(Debug)]
137pub struct PluginInterfaceManager {
138    /// Shared state
139    state: Arc<PluginInterfaceState>,
140    /// The writer for protocol info
141    protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
142    /// Manages stream messages and state
143    stream_manager: StreamManager,
144    /// State related to plugin calls
145    plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
146    /// Receiver for plugin call subscriptions
147    plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
148    /// Tracker for which plugin call streams being read belong to
149    ///
150    /// This is necessary so we know when we can remove context for plugin calls
151    plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
152    /// Garbage collector handle, to notify about the state of the plugin
153    gc: Option<PluginGc>,
154}
155
156impl PluginInterfaceManager {
157    pub fn new(
158        source: Arc<PluginSource>,
159        pid: Option<u32>,
160        writer: impl PluginWrite<PluginInput> + 'static,
161    ) -> PluginInterfaceManager {
162        let (subscription_tx, subscription_rx) = mpsc::channel();
163        let protocol_info_mut = WaitableMut::new();
164
165        PluginInterfaceManager {
166            state: Arc::new(PluginInterfaceState {
167                source,
168                process: pid.map(PluginProcess::new),
169                protocol_info: protocol_info_mut.reader(),
170                plugin_call_id_sequence: Sequence::default(),
171                stream_id_sequence: Sequence::default(),
172                plugin_call_subscription_sender: subscription_tx,
173                error: OnceLock::new(),
174                writer: Box::new(writer),
175            }),
176            protocol_info_mut,
177            stream_manager: StreamManager::new(),
178            plugin_call_states: BTreeMap::new(),
179            plugin_call_subscription_receiver: subscription_rx,
180            plugin_call_input_streams: BTreeMap::new(),
181            gc: None,
182        }
183    }
184
185    /// Add a garbage collector to this plugin. The manager will notify the garbage collector about
186    /// the state of the plugin so that it can be automatically cleaned up if the plugin is
187    /// inactive.
188    pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
189        self.gc = gc;
190    }
191
192    /// Consume pending messages in the `plugin_call_subscription_receiver`
193    fn receive_plugin_call_subscriptions(&mut self) {
194        while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
195            if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
196                e.insert(state);
197            } else {
198                log::warn!("Duplicate plugin call ID ignored: {id}");
199            }
200        }
201    }
202
203    /// Track the start of incoming stream(s)
204    fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
205        self.plugin_call_input_streams.insert(stream_id, call_id);
206        // Increment the number of streams on the subscription so context stays alive
207        self.receive_plugin_call_subscriptions();
208        if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
209            state.remaining_streams_to_read += 1;
210        }
211        // Add a lock to the garbage collector for each stream
212        if let Some(ref gc) = self.gc {
213            gc.increment_locks(1);
214        }
215    }
216
217    /// Track the end of an incoming stream
218    fn recv_stream_ended(&mut self, stream_id: StreamId) {
219        if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
220            if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
221                e.get_mut().remaining_streams_to_read -= 1;
222                // Remove the subscription if there are no more streams to be read.
223                if e.get().remaining_streams_to_read <= 0 {
224                    e.remove();
225                }
226            }
227            // Streams read from the plugin are tracked with locks on the GC so plugins don't get
228            // stopped if they have active streams
229            if let Some(ref gc) = self.gc {
230                gc.decrement_locks(1);
231            }
232        }
233    }
234
235    /// Find the [`Signals`] struct corresponding to the given plugin call id
236    fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
237        // Make sure we're up to date
238        self.receive_plugin_call_subscriptions();
239        // Find the subscription and return the context
240        self.plugin_call_states
241            .get(&id)
242            .map(|state| state.signals.clone())
243            .ok_or_else(|| ShellError::PluginFailedToDecode {
244                msg: format!("Unknown plugin call ID: {id}"),
245            })
246    }
247
248    /// Send a [`PluginCallResponse`] to the appropriate sender
249    fn send_plugin_call_response(
250        &mut self,
251        id: PluginCallId,
252        response: PluginCallResponse<PipelineData>,
253    ) -> Result<(), ShellError> {
254        // Ensure we're caught up on the subscriptions made
255        self.receive_plugin_call_subscriptions();
256
257        if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
258            // Remove the subscription sender, since this will be the last message.
259            //
260            // We can spawn a new one if we need it for engine calls.
261            if !e.get().dont_send_response
262                && e.get_mut()
263                    .sender
264                    .take()
265                    .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
266                    .is_none()
267            {
268                log::warn!("Received a plugin call response for id={id}, but the caller hung up");
269            }
270            // If there are no registered streams, just remove it
271            if e.get().remaining_streams_to_read <= 0 {
272                e.remove();
273            }
274            Ok(())
275        } else {
276            Err(ShellError::PluginFailedToDecode {
277                msg: format!("Unknown plugin call ID: {id}"),
278            })
279        }
280    }
281
282    /// Spawn a handler for engine calls for a plugin, in case we need to handle engine calls
283    /// after the response has already been received (in which case we have nowhere to send them)
284    fn spawn_engine_call_handler(
285        &mut self,
286        id: PluginCallId,
287    ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
288        let interface = self.get_interface();
289
290        if let Some(state) = self.plugin_call_states.get_mut(&id) {
291            if state.sender.is_none() {
292                let (tx, rx) = mpsc::channel();
293                let context_rx =
294                    state
295                        .context_rx
296                        .take()
297                        .ok_or_else(|| ShellError::NushellFailed {
298                            msg: "Tried to spawn the fallback engine call handler more than once"
299                                .into(),
300                        })?;
301
302                // Generate the state needed to handle engine calls
303                let mut current_call_state = CurrentCallState {
304                    context_tx: None,
305                    keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
306                    entered_foreground: false,
307                    span: state.span,
308                };
309
310                let handler = move || {
311                    // We receive on the thread so that we don't block the reader thread
312                    let mut context = context_rx
313                        .recv()
314                        .ok() // The plugin call won't send context if it's not required.
315                        .map(|c| c.0);
316
317                    for msg in rx {
318                        // This thread only handles engine calls.
319                        match msg {
320                            ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
321                                if let Err(err) = interface.handle_engine_call(
322                                    engine_call_id,
323                                    engine_call,
324                                    &mut current_call_state,
325                                    context.as_deref_mut(),
326                                ) {
327                                    log::warn!(
328                                        "Error in plugin post-response engine call handler: \
329                                        {err:?}"
330                                    );
331                                    return;
332                                }
333                            }
334                            other => log::warn!(
335                                "Bad message received in plugin post-response \
336                                engine call handler: {other:?}"
337                            ),
338                        }
339                    }
340                };
341                std::thread::Builder::new()
342                    .name("plugin engine call handler".into())
343                    .spawn(handler)
344                    .expect("failed to spawn thread");
345                state.sender = Some(tx);
346                Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
347            } else {
348                Err(ShellError::NushellFailed {
349                    msg: "Tried to spawn the fallback engine call handler before the plugin call \
350                        response had been received"
351                        .into(),
352                })
353            }
354        } else {
355            Err(ShellError::NushellFailed {
356                msg: format!("Couldn't find plugin ID={id} in subscriptions"),
357            })
358        }
359    }
360
361    /// Send an [`EngineCall`] to the appropriate sender
362    fn send_engine_call(
363        &mut self,
364        plugin_call_id: PluginCallId,
365        engine_call_id: EngineCallId,
366        call: EngineCall<PipelineData>,
367    ) -> Result<(), ShellError> {
368        // Ensure we're caught up on the subscriptions made
369        self.receive_plugin_call_subscriptions();
370
371        // Don't remove the sender, as there could be more calls or responses
372        if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
373            let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
374            // Call if there's an error sending the engine call
375            let send_error = |this: &Self| {
376                log::warn!(
377                    "Received an engine call for plugin_call_id={plugin_call_id}, \
378                    but the caller hung up"
379                );
380                // We really have no choice here but to send the response ourselves and hope we
381                // don't block
382                this.state.writer.write(&PluginInput::EngineCallResponse(
383                    engine_call_id,
384                    EngineCallResponse::Error(ShellError::GenericError {
385                        error: "Caller hung up".to_string(),
386                        msg: "Can't make engine call because the original caller hung up"
387                            .to_string(),
388                        span: None,
389                        help: None,
390                        inner: vec![],
391                    }),
392                ))?;
393                this.state.writer.flush()
394            };
395            // Try to send to the sender if it exists
396            if let Some(sender) = subscription.sender.as_ref() {
397                sender.send(msg).or_else(|_| send_error(self))
398            } else {
399                // The sender no longer exists. Spawn a specific one just for engine calls
400                let sender = self.spawn_engine_call_handler(plugin_call_id)?;
401                sender.send(msg).or_else(|_| send_error(self))
402            }
403        } else {
404            Err(ShellError::PluginFailedToDecode {
405                msg: format!("Unknown plugin call ID: {plugin_call_id}"),
406            })
407        }
408    }
409
410    /// True if there are no other copies of the state (which would mean there are no interfaces
411    /// and no stream readers/writers)
412    pub fn is_finished(&self) -> bool {
413        Arc::strong_count(&self.state) < 2
414    }
415
416    /// Loop on input from the given reader as long as `is_finished()` is false
417    ///
418    /// Any errors will be propagated to all read streams automatically.
419    pub fn consume_all(
420        &mut self,
421        mut reader: impl PluginRead<PluginOutput>,
422    ) -> Result<(), ShellError> {
423        let mut result = Ok(());
424
425        while let Some(msg) = reader.read().transpose() {
426            if self.is_finished() {
427                break;
428            }
429
430            // We assume an error here is unrecoverable (at least, without restarting the plugin)
431            if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
432                // Put the error in the state so that new calls see it
433                let _ = self.state.error.set(err.clone());
434                // Error to streams
435                let _ = self.stream_manager.broadcast_read_error(err.clone());
436                // Error to call waiters
437                self.receive_plugin_call_subscriptions();
438                for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
439                    let _ = subscription
440                        .sender
441                        .as_ref()
442                        .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
443                }
444                result = Err(err);
445                break;
446            }
447        }
448
449        // Tell the GC we are exiting so that the plugin doesn't get stuck open
450        if let Some(ref gc) = self.gc {
451            gc.exited();
452        }
453        result
454    }
455}
456
457impl InterfaceManager for PluginInterfaceManager {
458    type Interface = PluginInterface;
459    type Input = PluginOutput;
460
461    fn get_interface(&self) -> Self::Interface {
462        PluginInterface {
463            state: self.state.clone(),
464            stream_manager_handle: self.stream_manager.get_handle(),
465            gc: self.gc.clone(),
466        }
467    }
468
469    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
470        log::trace!("from plugin: {input:?}");
471
472        match input {
473            PluginOutput::Hello(info) => {
474                let info = Arc::new(info);
475                self.protocol_info_mut.set(info.clone())?;
476
477                let local_info = ProtocolInfo::default();
478                if local_info.is_compatible_with(&info)? {
479                    Ok(())
480                } else {
481                    Err(ShellError::PluginFailedToLoad {
482                        msg: format!(
483                            "Plugin `{}` is compiled for nushell version {}, \
484                                which is not compatible with version {}",
485                            self.state.source.name(),
486                            info.version,
487                            local_info.version,
488                        ),
489                    })
490                }
491            }
492            _ if !self.state.protocol_info.is_set() => {
493                // Must send protocol info first
494                Err(ShellError::PluginFailedToLoad {
495                    msg: format!(
496                        "Failed to receive initial Hello message from `{}`. \
497                            This plugin might be too old",
498                        self.state.source.name()
499                    ),
500                })
501            }
502            // Stream messages
503            PluginOutput::Data(..)
504            | PluginOutput::End(..)
505            | PluginOutput::Drop(..)
506            | PluginOutput::Ack(..) => {
507                self.consume_stream_message(input.try_into().map_err(|msg| {
508                    ShellError::NushellFailed {
509                        msg: format!("Failed to convert message {msg:?} to StreamMessage"),
510                    }
511                })?)
512            }
513            PluginOutput::Option(option) => match option {
514                PluginOption::GcDisabled(disabled) => {
515                    // Turn garbage collection off/on.
516                    if let Some(ref gc) = self.gc {
517                        gc.set_disabled(disabled);
518                    }
519                    Ok(())
520                }
521            },
522            PluginOutput::CallResponse(id, response) => {
523                // Handle reading the pipeline data, if any
524                let response = response
525                    .map_data(|data| {
526                        let signals = self.get_signals(id)?;
527
528                        // Register the stream in the response
529                        if let Some(stream_id) = data.stream_id() {
530                            self.recv_stream_started(id, stream_id);
531                        }
532
533                        self.read_pipeline_data(data, &signals)
534                    })
535                    .unwrap_or_else(|err| {
536                        // If there's an error with initializing this stream, change it to a plugin
537                        // error response, but send it anyway
538                        PluginCallResponse::Error(err.into())
539                    });
540                let result = self.send_plugin_call_response(id, response);
541                if result.is_ok() {
542                    // When a call ends, it releases a lock on the GC
543                    if let Some(ref gc) = self.gc {
544                        gc.decrement_locks(1);
545                    }
546                }
547                result
548            }
549            PluginOutput::EngineCall { context, id, call } => {
550                let call = call
551                    // Handle reading the pipeline data, if any
552                    .map_data(|input| {
553                        let signals = self.get_signals(context)?;
554                        self.read_pipeline_data(input, &signals)
555                    })
556                    // Do anything extra needed for each engine call setup
557                    .and_then(|mut engine_call| {
558                        match engine_call {
559                            EngineCall::EvalClosure {
560                                ref mut positional, ..
561                            } => {
562                                for arg in positional.iter_mut() {
563                                    // Add source to any plugin custom values in the arguments
564                                    PluginCustomValueWithSource::add_source_in(
565                                        arg,
566                                        &self.state.source,
567                                    )?;
568                                }
569                                Ok(engine_call)
570                            }
571                            _ => Ok(engine_call),
572                        }
573                    });
574                match call {
575                    Ok(call) => self.send_engine_call(context, id, call),
576                    // If there was an error with setting up the call, just write the error
577                    Err(err) => self.get_interface().write_engine_call_response(
578                        id,
579                        EngineCallResponse::Error(err),
580                        &CurrentCallState::default(),
581                    ),
582                }
583            }
584        }
585    }
586
587    fn stream_manager(&self) -> &StreamManager {
588        &self.stream_manager
589    }
590
591    fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
592        // Add source to any values
593        match data {
594            PipelineData::Value(ref mut value, _) => {
595                with_custom_values_in(value, |custom_value| {
596                    PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
597                    Ok::<_, ShellError>(())
598                })?;
599                Ok(data)
600            }
601            PipelineData::ListStream(stream, meta) => {
602                let source = self.state.source.clone();
603                Ok(PipelineData::list_stream(
604                    stream.map(move |mut value| {
605                        let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
606                        value
607                    }),
608                    meta,
609                ))
610            }
611            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
612        }
613    }
614
615    fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
616        // Keep track of streams that end
617        if let StreamMessage::End(id) = message {
618            self.recv_stream_ended(id);
619        }
620        self.stream_manager.handle_message(message)
621    }
622}
623
624/// A reference through which a plugin can be interacted with during execution.
625///
626/// This is not a public API.
627#[derive(Debug, Clone)]
628#[doc(hidden)]
629pub struct PluginInterface {
630    /// Shared state
631    state: Arc<PluginInterfaceState>,
632    /// Handle to stream manager
633    stream_manager_handle: StreamManagerHandle,
634    /// Handle to plugin garbage collector
635    gc: Option<PluginGc>,
636}
637
638impl PluginInterface {
639    /// Get the process ID for the plugin, if known.
640    pub fn pid(&self) -> Option<u32> {
641        self.state.process.as_ref().map(|p| p.pid())
642    }
643
644    /// Get the protocol info for the plugin. Will block to receive `Hello` if not received yet.
645    pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
646        self.state.protocol_info.get().and_then(|info| {
647            info.ok_or_else(|| ShellError::PluginFailedToLoad {
648                msg: format!(
649                    "Failed to get protocol info (`Hello` message) from the `{}` plugin",
650                    self.state.source.identity.name()
651                ),
652            })
653        })
654    }
655
656    /// Write the protocol info. This should be done after initialization
657    pub fn hello(&self) -> Result<(), ShellError> {
658        self.write(PluginInput::Hello(ProtocolInfo::default()))?;
659        self.flush()
660    }
661
662    /// Tell the plugin it should not expect any more plugin calls and should terminate after it has
663    /// finished processing the ones it has already received.
664    ///
665    /// Note that this is automatically called when the last existing `PluginInterface` is dropped.
666    /// You probably do not need to call this manually.
667    pub fn goodbye(&self) -> Result<(), ShellError> {
668        self.write(PluginInput::Goodbye)?;
669        self.flush()
670    }
671
672    /// Send the plugin a signal.
673    pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
674        self.write(PluginInput::Signal(action))?;
675        self.flush()
676    }
677
678    /// Write an [`EngineCallResponse`]. Writes the full stream contained in any [`PipelineData`]
679    /// before returning.
680    pub fn write_engine_call_response(
681        &self,
682        id: EngineCallId,
683        response: EngineCallResponse<PipelineData>,
684        state: &CurrentCallState,
685    ) -> Result<(), ShellError> {
686        // Set up any stream if necessary
687        let mut writer = None;
688        let response = response.map_data(|data| {
689            let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
690            writer = Some(data_writer);
691            Ok(data_header)
692        })?;
693
694        // Write the response, including the pipeline data header if present
695        self.write(PluginInput::EngineCallResponse(id, response))?;
696        self.flush()?;
697
698        // If we have a stream to write, do it now
699        if let Some(writer) = writer {
700            writer.write_background()?;
701        }
702
703        Ok(())
704    }
705
706    /// Write a plugin call message. Returns the writer for the stream.
707    fn write_plugin_call(
708        &self,
709        mut call: PluginCall<PipelineData>,
710        context: Option<&dyn PluginExecutionContext>,
711    ) -> Result<WritePluginCallResult, ShellError> {
712        let id = self.state.plugin_call_id_sequence.next()?;
713        let signals = context
714            .map(|c| c.signals().clone())
715            .unwrap_or_else(Signals::empty);
716        let (tx, rx) = mpsc::channel();
717        let (context_tx, context_rx) = mpsc::channel();
718        let keep_plugin_custom_values = mpsc::channel();
719
720        // Set up the state that will stay alive during the call.
721        let state = CurrentCallState {
722            context_tx: Some(context_tx),
723            keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
724            entered_foreground: false,
725            span: call.span(),
726        };
727
728        // Prepare the call with the state.
729        state.prepare_plugin_call(&mut call, &self.state.source)?;
730
731        // Convert the call into one with a header and handle the stream, if necessary
732        let (call, writer) = match call {
733            PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
734            PluginCall::Signature => (PluginCall::Signature, Default::default()),
735            PluginCall::CustomValueOp(value, op) => {
736                (PluginCall::CustomValueOp(value, op), Default::default())
737            }
738            PluginCall::Run(CallInfo { name, call, input }) => {
739                let (header, writer) = self.init_write_pipeline_data(input, &state)?;
740                (
741                    PluginCall::Run(CallInfo {
742                        name,
743                        call,
744                        input: header,
745                    }),
746                    writer,
747                )
748            }
749        };
750
751        // Don't try to send a response for a Dropped call.
752        let dont_send_response =
753            matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
754
755        // Register the subscription to the response, and the context
756        self.state
757            .plugin_call_subscription_sender
758            .send((
759                id,
760                PluginCallState {
761                    sender: Some(tx).filter(|_| !dont_send_response),
762                    dont_send_response,
763                    signals,
764                    context_rx: Some(context_rx),
765                    span: call.span(),
766                    keep_plugin_custom_values,
767                    remaining_streams_to_read: 0,
768                },
769            ))
770            .map_err(|_| {
771                let existing_error = self.state.error.get().cloned();
772                ShellError::GenericError {
773                    error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
774                    msg: "can't complete this operation because the plugin is closed".into(),
775                    span: call.span(),
776                    help: Some(format!(
777                        "the plugin may have experienced an error. Try loading the plugin again \
778                        with `{}`",
779                        self.state.source.identity.use_command(),
780                    )),
781                    inner: existing_error.into_iter().collect(),
782                }
783            })?;
784
785        // Starting a plugin call adds a lock on the GC. Locks are not added for streams being read
786        // by the plugin, so the plugin would have to explicitly tell us if it expects to stay alive
787        // while reading streams in the background after the response ends.
788        if let Some(ref gc) = self.gc {
789            gc.increment_locks(1);
790        }
791
792        // Write request
793        self.write(PluginInput::Call(id, call))?;
794        self.flush()?;
795
796        Ok(WritePluginCallResult {
797            receiver: rx,
798            writer,
799            state,
800        })
801    }
802
803    /// Read the channel for plugin call messages and handle them until the response is received.
804    fn receive_plugin_call_response(
805        &self,
806        rx: mpsc::Receiver<ReceivedPluginCallMessage>,
807        mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
808        mut state: CurrentCallState,
809    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
810        // Handle message from receiver
811        for msg in rx {
812            match msg {
813                ReceivedPluginCallMessage::Response(resp) => {
814                    if state.entered_foreground {
815                        // Make the plugin leave the foreground on return, even if it's a stream
816                        if let Some(context) = context.as_deref_mut() {
817                            if let Err(err) =
818                                set_foreground(self.state.process.as_ref(), context, false)
819                            {
820                                log::warn!("Failed to leave foreground state on exit: {err:?}");
821                            }
822                        }
823                    }
824                    if resp.has_stream() {
825                        // If the response has a stream, we need to register the context
826                        if let Some(context) = context {
827                            if let Some(ref context_tx) = state.context_tx {
828                                let _ = context_tx.send(Context(context.boxed()));
829                            }
830                        }
831                    }
832                    return Ok(resp);
833                }
834                ReceivedPluginCallMessage::Error(err) => {
835                    return Err(err);
836                }
837                ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
838                    self.handle_engine_call(
839                        engine_call_id,
840                        engine_call,
841                        &mut state,
842                        context.as_deref_mut(),
843                    )?;
844                }
845            }
846        }
847        // If we fail to get a response, check for an error in the state first, and return it if
848        // set. This is probably a much more helpful error than 'failed to receive response' alone
849        let existing_error = self.state.error.get().cloned();
850        Err(ShellError::GenericError {
851            error: format!(
852                "Failed to receive response to plugin call from `{}`",
853                self.state.source.identity.name()
854            ),
855            msg: "while waiting for this operation to complete".into(),
856            span: state.span,
857            help: Some(format!(
858                "try restarting the plugin with `{}`",
859                self.state.source.identity.use_command()
860            )),
861            inner: existing_error.into_iter().collect(),
862        })
863    }
864
865    /// Handle an engine call and write the response.
866    fn handle_engine_call(
867        &self,
868        engine_call_id: EngineCallId,
869        engine_call: EngineCall<PipelineData>,
870        state: &mut CurrentCallState,
871        context: Option<&mut (dyn PluginExecutionContext + '_)>,
872    ) -> Result<(), ShellError> {
873        let process = self.state.process.as_ref();
874        let resp = handle_engine_call(engine_call, state, context, process)
875            .unwrap_or_else(EngineCallResponse::Error);
876        // Handle stream
877        let mut writer = None;
878        let resp = resp
879            .map_data(|data| {
880                let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
881                writer = Some(data_writer);
882                Ok(data_header)
883            })
884            .unwrap_or_else(|err| {
885                // If we fail to set up the response write, change to an error response here
886                writer = None;
887                EngineCallResponse::Error(err)
888            });
889        // Write the response, then the stream
890        self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
891        self.flush()?;
892        if let Some(writer) = writer {
893            writer.write_background()?;
894        }
895        Ok(())
896    }
897
898    /// Perform a plugin call. Input and output streams are handled, and engine calls are handled
899    /// too if there are any before the final response.
900    fn plugin_call(
901        &self,
902        call: PluginCall<PipelineData>,
903        context: Option<&mut dyn PluginExecutionContext>,
904    ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
905        // Check for an error in the state first, and return it if set.
906        if let Some(error) = self.state.error.get() {
907            return Err(ShellError::GenericError {
908                error: format!(
909                    "Failed to send plugin call to `{}`",
910                    self.state.source.identity.name()
911                ),
912                msg: "the plugin encountered an error before this operation could be attempted"
913                    .into(),
914                span: call.span(),
915                help: Some(format!(
916                    "try loading the plugin again with `{}`",
917                    self.state.source.identity.use_command(),
918                )),
919                inner: vec![error.clone()],
920            });
921        }
922
923        let result = self.write_plugin_call(call, context.as_deref())?;
924
925        // Finish writing stream in the background
926        result.writer.write_background()?;
927
928        self.receive_plugin_call_response(result.receiver, context, result.state)
929    }
930
931    /// Get the metadata from the plugin.
932    pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
933        match self.plugin_call(PluginCall::Metadata, None)? {
934            PluginCallResponse::Metadata(meta) => Ok(meta),
935            PluginCallResponse::Error(err) => Err(err.into()),
936            _ => Err(ShellError::PluginFailedToDecode {
937                msg: "Received unexpected response to plugin Metadata call".into(),
938            }),
939        }
940    }
941
942    /// Get the command signatures from the plugin.
943    pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
944        match self.plugin_call(PluginCall::Signature, None)? {
945            PluginCallResponse::Signature(sigs) => Ok(sigs),
946            PluginCallResponse::Error(err) => Err(err.into()),
947            _ => Err(ShellError::PluginFailedToDecode {
948                msg: "Received unexpected response to plugin Signature call".into(),
949            }),
950        }
951    }
952
953    /// Run the plugin with the given call and execution context.
954    pub fn run(
955        &self,
956        call: CallInfo<PipelineData>,
957        context: &mut dyn PluginExecutionContext,
958    ) -> Result<PipelineData, ShellError> {
959        match self.plugin_call(PluginCall::Run(call), Some(context))? {
960            PluginCallResponse::PipelineData(data) => Ok(data),
961            PluginCallResponse::Error(err) => Err(err.into()),
962            _ => Err(ShellError::PluginFailedToDecode {
963                msg: "Received unexpected response to plugin Run call".into(),
964            }),
965        }
966    }
967
968    /// Do a custom value op that expects a value response (i.e. most of them)
969    fn custom_value_op_expecting_value(
970        &self,
971        value: Spanned<PluginCustomValueWithSource>,
972        op: CustomValueOp,
973    ) -> Result<Value, ShellError> {
974        let op_name = op.name();
975        let span = value.span;
976
977        // Check that the value came from the right source
978        value.item.verify_source(span, &self.state.source)?;
979
980        let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
981        match self.plugin_call(call, None)? {
982            PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
983            PluginCallResponse::Error(err) => Err(err.into()),
984            _ => Err(ShellError::PluginFailedToDecode {
985                msg: format!("Received unexpected response to custom value {op_name}() call"),
986            }),
987        }
988    }
989
990    /// Collapse a custom value to its base value.
991    pub fn custom_value_to_base_value(
992        &self,
993        value: Spanned<PluginCustomValueWithSource>,
994    ) -> Result<Value, ShellError> {
995        self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
996    }
997
998    /// Follow a numbered cell path on a custom value - e.g. `value.0`.
999    pub fn custom_value_follow_path_int(
1000        &self,
1001        value: Spanned<PluginCustomValueWithSource>,
1002        index: Spanned<usize>,
1003    ) -> Result<Value, ShellError> {
1004        self.custom_value_op_expecting_value(value, CustomValueOp::FollowPathInt(index))
1005    }
1006
1007    /// Follow a named cell path on a custom value - e.g. `value.column`.
1008    pub fn custom_value_follow_path_string(
1009        &self,
1010        value: Spanned<PluginCustomValueWithSource>,
1011        column_name: Spanned<String>,
1012    ) -> Result<Value, ShellError> {
1013        self.custom_value_op_expecting_value(value, CustomValueOp::FollowPathString(column_name))
1014    }
1015
1016    /// Invoke comparison logic for custom values.
1017    pub fn custom_value_partial_cmp(
1018        &self,
1019        value: PluginCustomValueWithSource,
1020        other_value: Value,
1021    ) -> Result<Option<Ordering>, ShellError> {
1022        // Check that the value came from the right source
1023        value.verify_source(Span::unknown(), &self.state.source)?;
1024
1025        // Note: the protocol is always designed to have a span with the custom value, but this
1026        // operation doesn't support one.
1027        let call = PluginCall::CustomValueOp(
1028            value.without_source().into_spanned(Span::unknown()),
1029            CustomValueOp::PartialCmp(other_value),
1030        );
1031        match self.plugin_call(call, None)? {
1032            PluginCallResponse::Ordering(ordering) => Ok(ordering),
1033            PluginCallResponse::Error(err) => Err(err.into()),
1034            _ => Err(ShellError::PluginFailedToDecode {
1035                msg: "Received unexpected response to custom value partial_cmp() call".into(),
1036            }),
1037        }
1038    }
1039
1040    /// Invoke functionality for an operator on a custom value.
1041    pub fn custom_value_operation(
1042        &self,
1043        left: Spanned<PluginCustomValueWithSource>,
1044        operator: Spanned<Operator>,
1045        right: Value,
1046    ) -> Result<Value, ShellError> {
1047        self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1048    }
1049
1050    /// Notify the plugin about a dropped custom value.
1051    pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1052        // Make sure we don't block here. This can happen on the receiver thread, which would cause a deadlock. We should not try to receive the response - just let it be discarded.
1053        //
1054        // Note: the protocol is always designed to have a span with the custom value, but this
1055        // operation doesn't support one.
1056        drop(self.write_plugin_call(
1057            PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1058            None,
1059        )?);
1060        Ok(())
1061    }
1062}
1063
1064impl Interface for PluginInterface {
1065    type Output = PluginInput;
1066    type DataContext = CurrentCallState;
1067
1068    fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1069        log::trace!("to plugin: {input:?}");
1070        self.state.writer.write(&input).map_err(|err| {
1071            log::warn!("write() error: {err}");
1072            // If there's an error in the state, return that instead because it's likely more
1073            // descriptive
1074            self.state.error.get().cloned().unwrap_or(err)
1075        })
1076    }
1077
1078    fn flush(&self) -> Result<(), ShellError> {
1079        self.state.writer.flush().map_err(|err| {
1080            log::warn!("flush() error: {err}");
1081            // If there's an error in the state, return that instead because it's likely more
1082            // descriptive
1083            self.state.error.get().cloned().unwrap_or(err)
1084        })
1085    }
1086
1087    fn stream_id_sequence(&self) -> &Sequence {
1088        &self.state.stream_id_sequence
1089    }
1090
1091    fn stream_manager_handle(&self) -> &StreamManagerHandle {
1092        &self.stream_manager_handle
1093    }
1094
1095    fn prepare_pipeline_data(
1096        &self,
1097        data: PipelineData,
1098        state: &CurrentCallState,
1099    ) -> Result<PipelineData, ShellError> {
1100        // Validate the destination of values in the pipeline data
1101        match data {
1102            PipelineData::Value(mut value, meta) => {
1103                state.prepare_value(&mut value, &self.state.source)?;
1104                Ok(PipelineData::value(value, meta))
1105            }
1106            PipelineData::ListStream(stream, meta) => {
1107                let source = self.state.source.clone();
1108                let state = state.clone();
1109                Ok(PipelineData::list_stream(
1110                    stream.map(move |mut value| {
1111                        match state.prepare_value(&mut value, &source) {
1112                            Ok(()) => value,
1113                            // Put the error in the stream instead
1114                            Err(err) => Value::error(err, value.span()),
1115                        }
1116                    }),
1117                    meta,
1118                ))
1119            }
1120            PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1121        }
1122    }
1123}
1124
1125impl Drop for PluginInterface {
1126    fn drop(&mut self) {
1127        // Automatically send `Goodbye` if there are no more interfaces. In that case there would be
1128        // only two copies of the state, one of which we hold, and one of which the manager holds.
1129        //
1130        // Our copy is about to be dropped, so there would only be one left, the manager. The
1131        // manager will never send any plugin calls, so we should let the plugin know that.
1132        if Arc::strong_count(&self.state) < 3 {
1133            if let Err(err) = self.goodbye() {
1134                log::warn!("Error during plugin Goodbye: {err}");
1135            }
1136        }
1137    }
1138}
1139
1140/// Return value of [`PluginInterface::write_plugin_call()`].
1141#[must_use]
1142struct WritePluginCallResult {
1143    /// Receiver for plugin call messages related to the written plugin call.
1144    receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1145    /// Writer for the stream, if any.
1146    writer: PipelineDataWriter<PluginInterface>,
1147    /// State to be kept for the duration of the plugin call.
1148    state: CurrentCallState,
1149}
1150
1151/// State related to the current plugin call being executed.
1152#[derive(Default, Clone)]
1153pub struct CurrentCallState {
1154    /// Sender for context, which should be sent if the plugin call returned a stream so that
1155    /// engine calls may continue to be handled.
1156    context_tx: Option<mpsc::Sender<Context>>,
1157    /// Sender for a channel that retains plugin custom values that need to stay alive for the
1158    /// duration of a plugin call.
1159    keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1160    /// The plugin call entered the foreground: this should be cleaned up automatically when the
1161    /// plugin call returns.
1162    entered_foreground: bool,
1163    /// The span that caused the plugin call.
1164    span: Option<Span>,
1165}
1166
1167impl CurrentCallState {
1168    /// Prepare a custom value for write. Verifies custom value origin, and keeps custom values that
1169    /// shouldn't be dropped immediately.
1170    fn prepare_custom_value(
1171        &self,
1172        custom_value: Spanned<&mut Box<dyn CustomValue>>,
1173        source: &PluginSource,
1174    ) -> Result<(), ShellError> {
1175        // Ensure we can use it
1176        PluginCustomValueWithSource::verify_source_of_custom_value(
1177            custom_value.as_deref().map(|cv| &**cv),
1178            source,
1179        )?;
1180
1181        // Check whether we need to keep it
1182        if let Some(keep_tx) = &self.keep_plugin_custom_values_tx {
1183            if let Some(custom_value) = custom_value
1184                .item
1185                .as_any()
1186                .downcast_ref::<PluginCustomValueWithSource>()
1187            {
1188                if custom_value.notify_on_drop() {
1189                    log::trace!("Keeping custom value for drop later: {custom_value:?}");
1190                    keep_tx
1191                        .send(custom_value.clone())
1192                        .map_err(|_| ShellError::NushellFailed {
1193                            msg: "Failed to custom value to keep channel".into(),
1194                        })?;
1195                }
1196            }
1197        }
1198
1199        // Strip the source from it so it can be serialized
1200        PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1201
1202        Ok(())
1203    }
1204
1205    /// Prepare a value for write, including all contained custom values.
1206    fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1207        with_custom_values_in(value, |custom_value| {
1208            self.prepare_custom_value(custom_value, source)
1209        })
1210    }
1211
1212    /// Prepare call arguments for write.
1213    fn prepare_call_args(
1214        &self,
1215        call: &mut EvaluatedCall,
1216        source: &PluginSource,
1217    ) -> Result<(), ShellError> {
1218        for arg in call.positional.iter_mut() {
1219            self.prepare_value(arg, source)?;
1220        }
1221        for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1222            self.prepare_value(arg, source)?;
1223        }
1224        Ok(())
1225    }
1226
1227    /// Prepare a plugin call for write. Does not affect pipeline data, which is handled by
1228    /// `prepare_pipeline_data()` instead.
1229    fn prepare_plugin_call<D>(
1230        &self,
1231        call: &mut PluginCall<D>,
1232        source: &PluginSource,
1233    ) -> Result<(), ShellError> {
1234        match call {
1235            PluginCall::Metadata => Ok(()),
1236            PluginCall::Signature => Ok(()),
1237            PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1238            PluginCall::CustomValueOp(_, op) => {
1239                // Handle anything within the op.
1240                match op {
1241                    CustomValueOp::ToBaseValue => Ok(()),
1242                    CustomValueOp::FollowPathInt(_) => Ok(()),
1243                    CustomValueOp::FollowPathString(_) => Ok(()),
1244                    CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1245                    CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1246                    CustomValueOp::Dropped => Ok(()),
1247                }
1248            }
1249        }
1250    }
1251}
1252
1253/// Handle an engine call.
1254pub(crate) fn handle_engine_call(
1255    call: EngineCall<PipelineData>,
1256    state: &mut CurrentCallState,
1257    context: Option<&mut (dyn PluginExecutionContext + '_)>,
1258    process: Option<&PluginProcess>,
1259) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1260    let call_name = call.name();
1261
1262    let context = context.ok_or_else(|| ShellError::GenericError {
1263        error: "A plugin execution context is required for this engine call".into(),
1264        msg: format!("attempted to call {call_name} outside of a command invocation"),
1265        span: None,
1266        help: Some("this is probably a bug with the plugin".into()),
1267        inner: vec![],
1268    })?;
1269
1270    match call {
1271        EngineCall::GetConfig => {
1272            let config = SharedCow::from(context.get_config()?);
1273            Ok(EngineCallResponse::Config(config))
1274        }
1275        EngineCall::GetPluginConfig => {
1276            let plugin_config = context.get_plugin_config()?;
1277            Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1278        }
1279        EngineCall::GetEnvVar(name) => {
1280            let value = context.get_env_var(&name)?;
1281            Ok(value
1282                .cloned()
1283                .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1284        }
1285        EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1286        EngineCall::GetCurrentDir => {
1287            let current_dir = context.get_current_dir()?;
1288            Ok(EngineCallResponse::value(Value::string(
1289                current_dir.item,
1290                current_dir.span,
1291            )))
1292        }
1293        EngineCall::AddEnvVar(name, value) => {
1294            context.add_env_var(name, value)?;
1295            Ok(EngineCallResponse::empty())
1296        }
1297        EngineCall::GetHelp => {
1298            let help = context.get_help()?;
1299            Ok(EngineCallResponse::value(Value::string(
1300                help.item, help.span,
1301            )))
1302        }
1303        EngineCall::EnterForeground => {
1304            let resp = set_foreground(process, context, true)?;
1305            state.entered_foreground = true;
1306            Ok(resp)
1307        }
1308        EngineCall::LeaveForeground => {
1309            let resp = set_foreground(process, context, false)?;
1310            state.entered_foreground = false;
1311            Ok(resp)
1312        }
1313        EngineCall::GetSpanContents(span) => {
1314            let contents = context.get_span_contents(span)?;
1315            Ok(EngineCallResponse::value(Value::binary(
1316                contents.item,
1317                contents.span,
1318            )))
1319        }
1320        EngineCall::EvalClosure {
1321            closure,
1322            positional,
1323            input,
1324            redirect_stdout,
1325            redirect_stderr,
1326        } => context
1327            .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1328            .map(EngineCallResponse::PipelineData),
1329        EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1330            if let Some(decl_id) = decl_id {
1331                EngineCallResponse::Identifier(decl_id)
1332            } else {
1333                EngineCallResponse::empty()
1334            }
1335        }),
1336        EngineCall::CallDecl {
1337            decl_id,
1338            call,
1339            input,
1340            redirect_stdout,
1341            redirect_stderr,
1342        } => context
1343            .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1344            .map(EngineCallResponse::PipelineData),
1345    }
1346}
1347
1348/// Implements enter/exit foreground
1349fn set_foreground(
1350    process: Option<&PluginProcess>,
1351    context: &mut dyn PluginExecutionContext,
1352    enter: bool,
1353) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1354    if let Some(process) = process {
1355        if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1356            if enter {
1357                let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1358                Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1359                    EngineCallResponse::value(Value::int(id as i64, context.span()))
1360                }))
1361            } else {
1362                process.exit_foreground()?;
1363                Ok(EngineCallResponse::empty())
1364            }
1365        } else {
1366            // This should always be present on a real context
1367            Err(ShellError::NushellFailed {
1368                msg: "missing required pipeline_externals_state from context \
1369                            for entering foreground"
1370                    .into(),
1371            })
1372        }
1373    } else {
1374        Err(ShellError::GenericError {
1375            error: "Can't manage plugin process to enter foreground".into(),
1376            msg: "the process ID for this plugin is unknown".into(),
1377            span: Some(context.span()),
1378            help: Some("the plugin may be running in a test".into()),
1379            inner: vec![],
1380        })
1381    }
1382}