1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10 GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11 PluginInput, PluginOption, PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14 CustomValue, DynamicSuggestion, IntoSpanned, PipelineData, PluginMetadata, PluginSignature,
15 ShellError, SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing,
16 engine::Sequence, shell_error::generic::GenericError,
17};
18use nu_utils::SharedCow;
19use std::{
20 collections::{BTreeMap, btree_map},
21 path::Path,
22 sync::{Arc, OnceLock, mpsc},
23};
24
25use crate::{
26 PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
27 process::PluginProcess,
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Debug)]
34enum ReceivedPluginCallMessage {
35 Response(PluginCallResponse<PipelineData>),
37
38 Error(ShellError),
40
41 EngineCall(EngineCallId, EngineCall<PipelineData>),
46}
47
48pub(crate) struct Context(Box<dyn PluginExecutionContext>);
50
51impl std::fmt::Debug for Context {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str("Context")
54 }
55}
56
57impl std::ops::Deref for Context {
58 type Target = dyn PluginExecutionContext;
59
60 fn deref(&self) -> &Self::Target {
61 &*self.0
62 }
63}
64
65struct PluginInterfaceState {
67 source: Arc<PluginSource>,
69 process: Option<PluginProcess>,
71 protocol_info: Waitable<Arc<ProtocolInfo>>,
73 plugin_call_id_sequence: Sequence,
75 stream_id_sequence: Sequence,
77 plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
79 error: OnceLock<ShellError>,
81 writer: Box<dyn PluginWrite<PluginInput>>,
83}
84
85impl std::fmt::Debug for PluginInterfaceState {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("PluginInterfaceState")
88 .field("source", &self.source)
89 .field("protocol_info", &self.protocol_info)
90 .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
91 .field("stream_id_sequence", &self.stream_id_sequence)
92 .field(
93 "plugin_call_subscription_sender",
94 &self.plugin_call_subscription_sender,
95 )
96 .field("error", &self.error)
97 .finish_non_exhaustive()
98 }
99}
100
101#[derive(Debug)]
103struct PluginCallState {
104 sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
106 dont_send_response: bool,
109 signals: Signals,
111 context_rx: Option<mpsc::Receiver<Context>>,
113 span: Option<Span>,
115 keep_plugin_custom_values: (
120 mpsc::Sender<PluginCustomValueWithSource>,
121 mpsc::Receiver<PluginCustomValueWithSource>,
122 ),
123 remaining_streams_to_read: i32,
125}
126
127impl Drop for PluginCallState {
128 fn drop(&mut self) {
129 for value in self.keep_plugin_custom_values.1.try_iter() {
131 log::trace!("Dropping custom value that was kept: {value:?}");
132 drop(value);
133 }
134 }
135}
136
137#[derive(Debug)]
139pub struct PluginInterfaceManager {
140 state: Arc<PluginInterfaceState>,
142 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
144 stream_manager: StreamManager,
146 plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
148 plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
150 plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
154 gc: Option<PluginGc>,
156}
157
158impl PluginInterfaceManager {
159 pub fn new(
160 source: Arc<PluginSource>,
161 pid: Option<u32>,
162 writer: impl PluginWrite<PluginInput> + 'static,
163 ) -> PluginInterfaceManager {
164 let (subscription_tx, subscription_rx) = mpsc::channel();
165 let protocol_info_mut = WaitableMut::new();
166
167 PluginInterfaceManager {
168 state: Arc::new(PluginInterfaceState {
169 source,
170 process: pid.map(PluginProcess::new),
171 protocol_info: protocol_info_mut.reader(),
172 plugin_call_id_sequence: Sequence::default(),
173 stream_id_sequence: Sequence::default(),
174 plugin_call_subscription_sender: subscription_tx,
175 error: OnceLock::new(),
176 writer: Box::new(writer),
177 }),
178 protocol_info_mut,
179 stream_manager: StreamManager::new(),
180 plugin_call_states: BTreeMap::new(),
181 plugin_call_subscription_receiver: subscription_rx,
182 plugin_call_input_streams: BTreeMap::new(),
183 gc: None,
184 }
185 }
186
187 pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
191 self.gc = gc;
192 }
193
194 fn receive_plugin_call_subscriptions(&mut self) {
196 while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
197 if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
198 e.insert(state);
199 } else {
200 log::warn!("Duplicate plugin call ID ignored: {id}");
201 }
202 }
203 }
204
205 fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
207 self.plugin_call_input_streams.insert(stream_id, call_id);
208 self.receive_plugin_call_subscriptions();
210 if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
211 state.remaining_streams_to_read += 1;
212 }
213 if let Some(ref gc) = self.gc {
215 gc.increment_locks(1);
216 }
217 }
218
219 fn recv_stream_ended(&mut self, stream_id: StreamId) {
221 if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
222 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
223 e.get_mut().remaining_streams_to_read -= 1;
224 if e.get().remaining_streams_to_read <= 0 {
226 e.remove();
227 }
228 }
229 if let Some(ref gc) = self.gc {
232 gc.decrement_locks(1);
233 }
234 }
235 }
236
237 fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
239 self.receive_plugin_call_subscriptions();
241 self.plugin_call_states
243 .get(&id)
244 .map(|state| state.signals.clone())
245 .ok_or_else(|| ShellError::PluginFailedToDecode {
246 msg: format!("Unknown plugin call ID: {id}"),
247 })
248 }
249
250 fn send_plugin_call_response(
252 &mut self,
253 id: PluginCallId,
254 response: PluginCallResponse<PipelineData>,
255 ) -> Result<(), ShellError> {
256 self.receive_plugin_call_subscriptions();
258
259 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
260 if !e.get().dont_send_response
264 && e.get_mut()
265 .sender
266 .take()
267 .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
268 .is_none()
269 {
270 log::warn!("Received a plugin call response for id={id}, but the caller hung up");
271 }
272 if e.get().remaining_streams_to_read <= 0 {
274 e.remove();
275 }
276 Ok(())
277 } else {
278 Err(ShellError::PluginFailedToDecode {
279 msg: format!("Unknown plugin call ID: {id}"),
280 })
281 }
282 }
283
284 fn spawn_engine_call_handler(
287 &mut self,
288 id: PluginCallId,
289 ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
290 let interface = self.get_interface();
291
292 if let Some(state) = self.plugin_call_states.get_mut(&id) {
293 if state.sender.is_none() {
294 let (tx, rx) = mpsc::channel();
295 let context_rx =
296 state
297 .context_rx
298 .take()
299 .ok_or_else(|| ShellError::NushellFailed {
300 msg: "Tried to spawn the fallback engine call handler more than once"
301 .into(),
302 })?;
303
304 let mut current_call_state = CurrentCallState {
306 context_tx: None,
307 keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
308 entered_foreground: false,
309 span: state.span,
310 };
311
312 let handler = move || {
313 let mut context = context_rx
315 .recv()
316 .ok() .map(|c| c.0);
318
319 for msg in rx {
320 match msg {
322 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
323 if let Err(err) = interface.handle_engine_call(
324 engine_call_id,
325 engine_call,
326 &mut current_call_state,
327 context.as_deref_mut(),
328 ) {
329 log::warn!(
330 "Error in plugin post-response engine call handler: \
331 {err:?}"
332 );
333 return;
334 }
335 }
336 other => log::warn!(
337 "Bad message received in plugin post-response \
338 engine call handler: {other:?}"
339 ),
340 }
341 }
342 };
343 std::thread::Builder::new()
344 .name("plugin engine call handler".into())
345 .spawn(handler)
346 .expect("failed to spawn thread");
347 state.sender = Some(tx);
348 Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
349 } else {
350 Err(ShellError::NushellFailed {
351 msg: "Tried to spawn the fallback engine call handler before the plugin call \
352 response had been received"
353 .into(),
354 })
355 }
356 } else {
357 Err(ShellError::NushellFailed {
358 msg: format!("Couldn't find plugin ID={id} in subscriptions"),
359 })
360 }
361 }
362
363 fn send_engine_call(
365 &mut self,
366 plugin_call_id: PluginCallId,
367 engine_call_id: EngineCallId,
368 call: EngineCall<PipelineData>,
369 ) -> Result<(), ShellError> {
370 self.receive_plugin_call_subscriptions();
372
373 if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
375 let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
376 let send_error = |this: &Self| {
378 log::warn!(
379 "Received an engine call for plugin_call_id={plugin_call_id}, \
380 but the caller hung up"
381 );
382 this.state.writer.write(&PluginInput::EngineCallResponse(
385 engine_call_id,
386 EngineCallResponse::Error(ShellError::Generic(GenericError::new_internal(
387 "Caller hung up",
388 "Can't make engine call because the original caller hung up",
389 ))),
390 ))?;
391 this.state.writer.flush()
392 };
393 if let Some(sender) = subscription.sender.as_ref() {
395 sender.send(msg).or_else(|_| send_error(self))
396 } else {
397 let sender = self.spawn_engine_call_handler(plugin_call_id)?;
399 sender.send(msg).or_else(|_| send_error(self))
400 }
401 } else {
402 Err(ShellError::PluginFailedToDecode {
403 msg: format!("Unknown plugin call ID: {plugin_call_id}"),
404 })
405 }
406 }
407
408 pub fn is_finished(&self) -> bool {
411 Arc::strong_count(&self.state) < 2
412 }
413
414 pub fn consume_all(
418 &mut self,
419 mut reader: impl PluginRead<PluginOutput>,
420 ) -> Result<(), ShellError> {
421 let mut result = Ok(());
422
423 while let Some(msg) = reader.read().transpose() {
424 if self.is_finished() {
425 break;
426 }
427
428 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
430 let _ = self.state.error.set(err.clone());
432 let _ = self.stream_manager.broadcast_read_error(err.clone());
434 self.receive_plugin_call_subscriptions();
436 for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
437 let _ = subscription
438 .sender
439 .as_ref()
440 .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
441 }
442 result = Err(err);
443 break;
444 }
445 }
446
447 if let Some(ref gc) = self.gc {
449 gc.exited();
450 }
451 result
452 }
453}
454
455impl InterfaceManager for PluginInterfaceManager {
456 type Interface = PluginInterface;
457 type Input = PluginOutput;
458
459 fn get_interface(&self) -> Self::Interface {
460 PluginInterface {
461 state: self.state.clone(),
462 stream_manager_handle: self.stream_manager.get_handle(),
463 gc: self.gc.clone(),
464 }
465 }
466
467 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
468 log::trace!("from plugin: {input:?}");
469
470 match input {
471 PluginOutput::Hello(info) => {
472 let info = Arc::new(info);
473 self.protocol_info_mut.set(info.clone())?;
474
475 let local_info = ProtocolInfo::default();
476 if local_info.is_compatible_with(&info)? {
477 Ok(())
478 } else {
479 Err(ShellError::PluginFailedToLoad {
480 msg: format!(
481 "Plugin `{}` is compiled for nushell version {}, \
482 which is not compatible with version {}",
483 self.state.source.name(),
484 info.version,
485 local_info.version,
486 ),
487 })
488 }
489 }
490 _ if !self.state.protocol_info.is_set() => {
491 Err(ShellError::PluginFailedToLoad {
493 msg: format!(
494 "Failed to receive initial Hello message from `{}`. \
495 This plugin might be too old",
496 self.state.source.name()
497 ),
498 })
499 }
500 PluginOutput::Data(..)
502 | PluginOutput::End(..)
503 | PluginOutput::Drop(..)
504 | PluginOutput::Ack(..) => {
505 self.consume_stream_message(input.try_into().map_err(|msg| {
506 ShellError::NushellFailed {
507 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
508 }
509 })?)
510 }
511 PluginOutput::Option(option) => match option {
512 PluginOption::GcDisabled(disabled) => {
513 if let Some(ref gc) = self.gc {
515 gc.set_disabled(disabled);
516 }
517 Ok(())
518 }
519 },
520 PluginOutput::CallResponse(id, response) => {
521 let response = response
523 .map_data(|data| {
524 let signals = self.get_signals(id)?;
525
526 if let Some(stream_id) = data.stream_id() {
528 self.recv_stream_started(id, stream_id);
529 }
530
531 self.read_pipeline_data(data, &signals)
532 })
533 .unwrap_or_else(|err| {
534 PluginCallResponse::Error(err)
537 });
538 let result = self.send_plugin_call_response(id, response);
539 if result.is_ok() {
540 if let Some(ref gc) = self.gc {
542 gc.decrement_locks(1);
543 }
544 }
545 result
546 }
547 PluginOutput::EngineCall { context, id, call } => {
548 let call = call
549 .map_data(|input| {
551 let signals = self.get_signals(context)?;
552 self.read_pipeline_data(input, &signals)
553 })
554 .and_then(|mut engine_call| {
556 match engine_call {
557 EngineCall::EvalClosure {
558 ref mut positional, ..
559 } => {
560 for arg in positional.iter_mut() {
561 PluginCustomValueWithSource::add_source_in(
563 arg,
564 &self.state.source,
565 )?;
566 }
567 Ok(engine_call)
568 }
569 _ => Ok(engine_call),
570 }
571 });
572 match call {
573 Ok(call) => self.send_engine_call(context, id, call),
574 Err(err) => self.get_interface().write_engine_call_response(
576 id,
577 EngineCallResponse::Error(err),
578 &CurrentCallState::default(),
579 ),
580 }
581 }
582 }
583 }
584
585 fn stream_manager(&self) -> &StreamManager {
586 &self.stream_manager
587 }
588
589 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
590 match data {
592 PipelineData::Value(ref mut value, _) => {
593 with_custom_values_in(value, |custom_value| {
594 PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
595 Ok::<_, ShellError>(())
596 })?;
597 Ok(data)
598 }
599 PipelineData::ListStream(stream, meta) => {
600 let source = self.state.source.clone();
601 Ok(PipelineData::list_stream(
602 stream.map(move |mut value| {
603 let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
604 value
605 }),
606 meta,
607 ))
608 }
609 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
610 }
611 }
612
613 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
614 if let StreamMessage::End(id) = message {
616 self.recv_stream_ended(id);
617 }
618 self.stream_manager.handle_message(message)
619 }
620}
621
622#[derive(Debug, Clone)]
626#[doc(hidden)]
627pub struct PluginInterface {
628 state: Arc<PluginInterfaceState>,
630 stream_manager_handle: StreamManagerHandle,
632 gc: Option<PluginGc>,
634}
635
636impl PluginInterface {
637 pub fn pid(&self) -> Option<u32> {
639 self.state.process.as_ref().map(|p| p.pid())
640 }
641
642 pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
644 self.state.protocol_info.get().and_then(|info| {
645 info.ok_or_else(|| ShellError::PluginFailedToLoad {
646 msg: format!(
647 "Failed to get protocol info (`Hello` message) from the `{}` plugin",
648 self.state.source.identity.name()
649 ),
650 })
651 })
652 }
653
654 pub fn hello(&self) -> Result<(), ShellError> {
656 self.write(PluginInput::Hello(ProtocolInfo::default()))?;
657 self.flush()
658 }
659
660 pub fn goodbye(&self) -> Result<(), ShellError> {
666 self.write(PluginInput::Goodbye)?;
667 self.flush()
668 }
669
670 pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
672 self.write(PluginInput::Signal(action))?;
673 self.flush()
674 }
675
676 pub fn write_engine_call_response(
679 &self,
680 id: EngineCallId,
681 response: EngineCallResponse<PipelineData>,
682 state: &CurrentCallState,
683 ) -> Result<(), ShellError> {
684 let mut writer = None;
686 let response = response.map_data(|data| {
687 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
688 writer = Some(data_writer);
689 Ok(data_header)
690 })?;
691
692 self.write(PluginInput::EngineCallResponse(id, response))?;
694 self.flush()?;
695
696 if let Some(writer) = writer {
698 writer.write_background()?;
699 }
700
701 Ok(())
702 }
703
704 fn write_plugin_call(
706 &self,
707 mut call: PluginCall<PipelineData>,
708 context: Option<&dyn PluginExecutionContext>,
709 ) -> Result<WritePluginCallResult, ShellError> {
710 let id = self.state.plugin_call_id_sequence.next()?;
711 let signals = context
712 .map(|c| c.signals().clone())
713 .unwrap_or_else(Signals::empty);
714 let (tx, rx) = mpsc::channel();
715 let (context_tx, context_rx) = mpsc::channel();
716 let keep_plugin_custom_values = mpsc::channel();
717
718 let state = CurrentCallState {
720 context_tx: Some(context_tx),
721 keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
722 entered_foreground: false,
723 span: call.span(),
724 };
725
726 state.prepare_plugin_call(&mut call, &self.state.source)?;
728
729 let (call, writer) = match call {
731 PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
732 PluginCall::Signature => (PluginCall::Signature, Default::default()),
733 PluginCall::CustomValueOp(value, op) => {
734 (PluginCall::CustomValueOp(value, op), Default::default())
735 }
736 PluginCall::GetCompletion(flag_name) => {
737 (PluginCall::GetCompletion(flag_name), Default::default())
738 }
739 PluginCall::Run(CallInfo { name, call, input }) => {
740 let (header, writer) = self.init_write_pipeline_data(input, &state)?;
741 (
742 PluginCall::Run(CallInfo {
743 name,
744 call,
745 input: header,
746 }),
747 writer,
748 )
749 }
750 };
751
752 let dont_send_response =
754 matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
755
756 self.state
758 .plugin_call_subscription_sender
759 .send((
760 id,
761 PluginCallState {
762 sender: Some(tx).filter(|_| !dont_send_response),
763 dont_send_response,
764 signals,
765 context_rx: Some(context_rx),
766 span: call.span(),
767 keep_plugin_custom_values,
768 remaining_streams_to_read: 0,
769 },
770 ))
771 .map_err(|_| {
772 let existing_error = self.state.error.get().cloned();
773 let help = format!(
774 "the plugin may have experienced an error. Try loading the plugin again \
775 with `{}`",
776 self.state.source.identity.use_command(),
777 );
778 let error = if let Some(span) = call.span() {
779 GenericError::new(
780 format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
781 "can't complete this operation because the plugin is closed",
782 span,
783 )
784 .with_help(help)
785 .with_inner(existing_error.into_iter())
786 } else {
787 GenericError::new_internal(
788 format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
789 "can't complete this operation because the plugin is closed",
790 )
791 .with_help(help)
792 .with_inner(existing_error.into_iter())
793 };
794 ShellError::Generic(error)
795 })?;
796
797 if let Some(ref gc) = self.gc {
801 gc.increment_locks(1);
802 }
803
804 self.write(PluginInput::Call(id, call))?;
806 self.flush()?;
807
808 Ok(WritePluginCallResult {
809 receiver: rx,
810 writer,
811 state,
812 })
813 }
814
815 fn receive_plugin_call_response(
817 &self,
818 rx: mpsc::Receiver<ReceivedPluginCallMessage>,
819 mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
820 mut state: CurrentCallState,
821 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
822 for msg in rx {
824 match msg {
825 ReceivedPluginCallMessage::Response(resp) => {
826 if state.entered_foreground {
827 if let Some(context) = context.as_deref_mut()
829 && let Err(err) =
830 set_foreground(self.state.process.as_ref(), context, false)
831 {
832 log::warn!("Failed to leave foreground state on exit: {err:?}");
833 }
834 }
835 if resp.has_stream() {
836 if let Some(context) = context
838 && let Some(ref context_tx) = state.context_tx
839 {
840 let _ = context_tx.send(Context(context.boxed()));
841 }
842 }
843 return Ok(resp);
844 }
845 ReceivedPluginCallMessage::Error(err) => {
846 return Err(err);
847 }
848 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
849 self.handle_engine_call(
850 engine_call_id,
851 engine_call,
852 &mut state,
853 context.as_deref_mut(),
854 )?;
855 }
856 }
857 }
858 let existing_error = self.state.error.get().cloned();
861 {
862 let help = format!(
863 "try restarting the plugin with `{}`",
864 self.state.source.identity.use_command()
865 );
866 let error = if let Some(span) = state.span {
867 GenericError::new(
868 format!(
869 "Failed to receive response to plugin call from `{}`",
870 self.state.source.identity.name()
871 ),
872 "while waiting for this operation to complete",
873 span,
874 )
875 .with_help(help)
876 .with_inner(existing_error)
877 } else {
878 GenericError::new_internal(
879 format!(
880 "Failed to receive response to plugin call from `{}`",
881 self.state.source.identity.name()
882 ),
883 "while waiting for this operation to complete",
884 )
885 .with_help(help)
886 .with_inner(existing_error)
887 };
888 Err(ShellError::Generic(error))
889 }
890 }
891
892 fn handle_engine_call(
894 &self,
895 engine_call_id: EngineCallId,
896 engine_call: EngineCall<PipelineData>,
897 state: &mut CurrentCallState,
898 context: Option<&mut (dyn PluginExecutionContext + '_)>,
899 ) -> Result<(), ShellError> {
900 let process = self.state.process.as_ref();
901 let resp = handle_engine_call(engine_call, state, context, process)
902 .unwrap_or_else(EngineCallResponse::Error);
903 let mut writer = None;
905 let resp = resp
906 .map_data(|data| {
907 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
908 writer = Some(data_writer);
909 Ok(data_header)
910 })
911 .unwrap_or_else(|err| {
912 writer = None;
914 EngineCallResponse::Error(err)
915 });
916 self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
918 self.flush()?;
919 if let Some(writer) = writer {
920 writer.write_background()?;
921 }
922 Ok(())
923 }
924
925 fn plugin_call(
928 &self,
929 call: PluginCall<PipelineData>,
930 context: Option<&mut dyn PluginExecutionContext>,
931 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
932 if let Some(error) = self.state.error.get() {
934 let help = format!(
935 "try loading the plugin again with `{}`",
936 self.state.source.identity.use_command(),
937 );
938 let error = if let Some(span) = call.span() {
939 GenericError::new(
940 format!(
941 "Failed to send plugin call to `{}`",
942 self.state.source.identity.name()
943 ),
944 "the plugin encountered an error before this operation could be attempted",
945 span,
946 )
947 .with_help(help)
948 .with_inner([error.clone()])
949 } else {
950 GenericError::new_internal(
951 format!(
952 "Failed to send plugin call to `{}`",
953 self.state.source.identity.name()
954 ),
955 "the plugin encountered an error before this operation could be attempted",
956 )
957 .with_help(help)
958 .with_inner([error.clone()])
959 };
960 return Err(ShellError::Generic(error));
961 }
962
963 let result = self.write_plugin_call(call, context.as_deref())?;
964
965 result.writer.write_background()?;
967
968 self.receive_plugin_call_response(result.receiver, context, result.state)
969 }
970
971 pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
973 match self.plugin_call(PluginCall::Metadata, None)? {
974 PluginCallResponse::Metadata(meta) => Ok(meta),
975 PluginCallResponse::Error(err) => Err(err),
976 _ => Err(ShellError::PluginFailedToDecode {
977 msg: "Received unexpected response to plugin Metadata call".into(),
978 }),
979 }
980 }
981
982 pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
984 match self.plugin_call(PluginCall::Signature, None)? {
985 PluginCallResponse::Signature(sigs) => Ok(sigs),
986 PluginCallResponse::Error(err) => Err(err),
987 _ => Err(ShellError::PluginFailedToDecode {
988 msg: "Received unexpected response to plugin Signature call".into(),
989 }),
990 }
991 }
992
993 pub fn run(
995 &self,
996 call: CallInfo<PipelineData>,
997 context: &mut dyn PluginExecutionContext,
998 ) -> Result<PipelineData, ShellError> {
999 match self.plugin_call(PluginCall::Run(call), Some(context))? {
1000 PluginCallResponse::PipelineData(data) => Ok(data),
1001 PluginCallResponse::Error(err) => Err(err),
1002 _ => Err(ShellError::PluginFailedToDecode {
1003 msg: "Received unexpected response to plugin Run call".into(),
1004 }),
1005 }
1006 }
1007
1008 pub fn get_dynamic_completion(
1010 &self,
1011 info: GetCompletionInfo,
1012 ) -> Result<Option<Vec<DynamicSuggestion>>, ShellError> {
1013 match self.plugin_call(PluginCall::GetCompletion(info), None)? {
1014 PluginCallResponse::CompletionItems(items) => Ok(items),
1015 PluginCallResponse::Error(err) => Err(err),
1016 _ => Err(ShellError::PluginFailedToDecode {
1017 msg: "Received unexpected response to plugin GetCompletion call".into(),
1018 }),
1019 }
1020 }
1021
1022 fn custom_value_op_expecting_value(
1024 &self,
1025 value: Spanned<PluginCustomValueWithSource>,
1026 op: CustomValueOp,
1027 ) -> Result<Value, ShellError> {
1028 let op_name = op.name();
1029 let span = value.span;
1030
1031 value.item.verify_source(span, &self.state.source)?;
1033
1034 let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
1035 match self.plugin_call(call, None)? {
1036 PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
1037 PluginCallResponse::Error(err) => Err(err),
1038 _ => Err(ShellError::PluginFailedToDecode {
1039 msg: format!("Received unexpected response to custom value {op_name}() call"),
1040 }),
1041 }
1042 }
1043
1044 pub fn custom_value_to_base_value(
1046 &self,
1047 value: Spanned<PluginCustomValueWithSource>,
1048 ) -> Result<Value, ShellError> {
1049 self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
1050 }
1051
1052 pub fn custom_value_follow_path_int(
1054 &self,
1055 value: Spanned<PluginCustomValueWithSource>,
1056 index: Spanned<usize>,
1057 optional: bool,
1058 ) -> Result<Value, ShellError> {
1059 self.custom_value_op_expecting_value(
1060 value,
1061 CustomValueOp::FollowPathInt { index, optional },
1062 )
1063 }
1064
1065 pub fn custom_value_follow_path_string(
1067 &self,
1068 value: Spanned<PluginCustomValueWithSource>,
1069 column_name: Spanned<String>,
1070 optional: bool,
1071 casing: Casing,
1072 ) -> Result<Value, ShellError> {
1073 self.custom_value_op_expecting_value(
1074 value,
1075 CustomValueOp::FollowPathString {
1076 column_name,
1077 optional,
1078 casing,
1079 },
1080 )
1081 }
1082
1083 pub fn custom_value_partial_cmp(
1085 &self,
1086 value: PluginCustomValueWithSource,
1087 other_value: Value,
1088 span: Span,
1089 ) -> Result<Option<Ordering>, ShellError> {
1090 value.verify_source(span, &self.state.source)?;
1092
1093 let call = PluginCall::CustomValueOp(
1094 value.without_source().into_spanned(span),
1095 CustomValueOp::PartialCmp(other_value),
1096 );
1097 match self.plugin_call(call, None)? {
1098 PluginCallResponse::Ordering(ordering) => Ok(ordering),
1099 PluginCallResponse::Error(err) => Err(err),
1100 _ => Err(ShellError::PluginFailedToDecode {
1101 msg: "Received unexpected response to custom value partial_cmp() call".into(),
1102 }),
1103 }
1104 }
1105
1106 pub fn custom_value_operation(
1108 &self,
1109 left: Spanned<PluginCustomValueWithSource>,
1110 operator: Spanned<Operator>,
1111 right: Value,
1112 ) -> Result<Value, ShellError> {
1113 self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1114 }
1115
1116 pub fn custom_value_save(
1118 &self,
1119 value: Spanned<PluginCustomValueWithSource>,
1120 path: Spanned<&Path>,
1121 save_call_span: Span,
1122 ) -> Result<(), ShellError> {
1123 value.item.verify_source(value.span, &self.state.source)?;
1125
1126 let call = PluginCall::CustomValueOp(
1127 value.map(|cv| cv.without_source()),
1128 CustomValueOp::Save {
1129 path: path.map(ToOwned::to_owned),
1130 save_call_span,
1131 },
1132 );
1133 match self.plugin_call(call, None)? {
1134 PluginCallResponse::Ok => Ok(()),
1135 PluginCallResponse::Error(err) => Err(err),
1136 _ => Err(ShellError::PluginFailedToDecode {
1137 msg: "Received unexpected response to custom value save() call".into(),
1138 }),
1139 }
1140 }
1141
1142 pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1144 drop(self.write_plugin_call(
1150 PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1151 None,
1152 )?);
1153 Ok(())
1154 }
1155}
1156
1157impl Interface for PluginInterface {
1158 type Output = PluginInput;
1159 type DataContext = CurrentCallState;
1160
1161 fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1162 log::trace!("to plugin: {input:?}");
1163 self.state.writer.write(&input).map_err(|err| {
1164 log::warn!("write() error: {err}");
1165 self.state.error.get().cloned().unwrap_or(err)
1168 })
1169 }
1170
1171 fn flush(&self) -> Result<(), ShellError> {
1172 self.state.writer.flush().map_err(|err| {
1173 log::warn!("flush() error: {err}");
1174 self.state.error.get().cloned().unwrap_or(err)
1177 })
1178 }
1179
1180 fn stream_id_sequence(&self) -> &Sequence {
1181 &self.state.stream_id_sequence
1182 }
1183
1184 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1185 &self.stream_manager_handle
1186 }
1187
1188 fn prepare_pipeline_data(
1189 &self,
1190 data: PipelineData,
1191 state: &CurrentCallState,
1192 ) -> Result<PipelineData, ShellError> {
1193 match data {
1195 PipelineData::Value(mut value, meta) => {
1196 state.prepare_value(&mut value, &self.state.source)?;
1197 Ok(PipelineData::value(value, meta))
1198 }
1199 PipelineData::ListStream(stream, meta) => {
1200 let source = self.state.source.clone();
1201 let state = state.clone();
1202 Ok(PipelineData::list_stream(
1203 stream.map(move |mut value| {
1204 match state.prepare_value(&mut value, &source) {
1205 Ok(()) => value,
1206 Err(err) => Value::error(err, value.span()),
1208 }
1209 }),
1210 meta,
1211 ))
1212 }
1213 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1214 }
1215 }
1216}
1217
1218impl Drop for PluginInterface {
1219 fn drop(&mut self) {
1220 if Arc::strong_count(&self.state) < 3
1226 && let Err(err) = self.goodbye()
1227 {
1228 log::warn!("Error during plugin Goodbye: {err}");
1229 }
1230 }
1231}
1232
1233#[must_use]
1235struct WritePluginCallResult {
1236 receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1238 writer: PipelineDataWriter<PluginInterface>,
1240 state: CurrentCallState,
1242}
1243
1244#[derive(Default, Clone)]
1246pub struct CurrentCallState {
1247 context_tx: Option<mpsc::Sender<Context>>,
1250 keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1253 entered_foreground: bool,
1256 span: Option<Span>,
1258}
1259
1260impl CurrentCallState {
1261 fn prepare_custom_value(
1264 &self,
1265 custom_value: Spanned<&mut Box<dyn CustomValue>>,
1266 source: &PluginSource,
1267 ) -> Result<(), ShellError> {
1268 PluginCustomValueWithSource::verify_source_of_custom_value(
1270 custom_value.as_deref().map(|cv| &**cv),
1271 source,
1272 )?;
1273
1274 if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1276 && let Some(custom_value) = custom_value
1277 .item
1278 .as_any()
1279 .downcast_ref::<PluginCustomValueWithSource>()
1280 && custom_value.notify_on_drop()
1281 {
1282 log::trace!("Keeping custom value for drop later: {custom_value:?}");
1283 keep_tx
1284 .send(custom_value.clone())
1285 .map_err(|_| ShellError::NushellFailed {
1286 msg: "Failed to custom value to keep channel".into(),
1287 })?;
1288 }
1289
1290 PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1292
1293 Ok(())
1294 }
1295
1296 fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1298 with_custom_values_in(value, |custom_value| {
1299 self.prepare_custom_value(custom_value, source)
1300 })
1301 }
1302
1303 fn prepare_call_args(
1305 &self,
1306 call: &mut EvaluatedCall,
1307 source: &PluginSource,
1308 ) -> Result<(), ShellError> {
1309 for arg in call.positional.iter_mut() {
1310 self.prepare_value(arg, source)?;
1311 }
1312 for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1313 self.prepare_value(arg, source)?;
1314 }
1315 Ok(())
1316 }
1317
1318 fn prepare_plugin_call<D>(
1321 &self,
1322 call: &mut PluginCall<D>,
1323 source: &PluginSource,
1324 ) -> Result<(), ShellError> {
1325 match call {
1326 PluginCall::Metadata => Ok(()),
1327 PluginCall::Signature => Ok(()),
1328 PluginCall::GetCompletion(_) => Ok(()),
1329 PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1330 PluginCall::CustomValueOp(_, op) => {
1331 match op {
1333 CustomValueOp::ToBaseValue => Ok(()),
1334 CustomValueOp::FollowPathInt { .. } => Ok(()),
1335 CustomValueOp::FollowPathString { .. } => Ok(()),
1336 CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1337 CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1338 CustomValueOp::Save { .. } => Ok(()),
1339 CustomValueOp::Dropped => Ok(()),
1340 }
1341 }
1342 }
1343 }
1344}
1345
1346pub(crate) fn handle_engine_call(
1348 call: EngineCall<PipelineData>,
1349 state: &mut CurrentCallState,
1350 context: Option<&mut (dyn PluginExecutionContext + '_)>,
1351 process: Option<&PluginProcess>,
1352) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1353 let call_name = call.name();
1354
1355 let context = context.ok_or_else(|| {
1356 ShellError::Generic(
1357 GenericError::new_internal(
1358 "A plugin execution context is required for this engine call",
1359 format!("attempted to call {call_name} outside of a command invocation"),
1360 )
1361 .with_help("this is probably a bug with the plugin"),
1362 )
1363 })?;
1364
1365 match call {
1366 EngineCall::GetConfig => {
1367 let config = SharedCow::from(context.get_config()?);
1368 Ok(EngineCallResponse::Config(config))
1369 }
1370 EngineCall::GetPluginConfig => {
1371 let plugin_config = context.get_plugin_config()?;
1372 Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1373 }
1374 EngineCall::GetEnvVar(name) => {
1375 let value = context.get_env_var(&name)?;
1376 Ok(value
1377 .cloned()
1378 .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1379 }
1380 EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1381 EngineCall::GetCurrentDir => {
1382 let current_dir = context.get_current_dir()?;
1383 Ok(EngineCallResponse::value(Value::string(
1384 current_dir.item,
1385 current_dir.span,
1386 )))
1387 }
1388 EngineCall::AddEnvVar(name, value) => {
1389 context.add_env_var(name, value)?;
1390 Ok(EngineCallResponse::empty())
1391 }
1392 EngineCall::GetHelp => {
1393 let help = context.get_help()?;
1394 Ok(EngineCallResponse::value(Value::string(
1395 help.item, help.span,
1396 )))
1397 }
1398 EngineCall::EnterForeground => {
1399 let resp = set_foreground(process, context, true)?;
1400 state.entered_foreground = true;
1401 Ok(resp)
1402 }
1403 EngineCall::LeaveForeground => {
1404 let resp = set_foreground(process, context, false)?;
1405 state.entered_foreground = false;
1406 Ok(resp)
1407 }
1408 EngineCall::GetSpanContents(span) => {
1409 let contents = context.get_span_contents(span)?;
1410 Ok(EngineCallResponse::value(Value::binary(
1411 contents.item,
1412 contents.span,
1413 )))
1414 }
1415 EngineCall::EvalClosure {
1416 closure,
1417 positional,
1418 input,
1419 redirect_stdout,
1420 redirect_stderr,
1421 } => context
1422 .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1423 .map(EngineCallResponse::PipelineData),
1424 EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1425 if let Some(decl_id) = decl_id {
1426 EngineCallResponse::Identifier(decl_id)
1427 } else {
1428 EngineCallResponse::empty()
1429 }
1430 }),
1431 EngineCall::GetBlockIR(block_id) => context
1432 .get_block_ir(block_id)
1433 .map(|ir| EngineCallResponse::IrBlock(Box::new(ir))),
1434 EngineCall::CallDecl {
1435 decl_id,
1436 call,
1437 input,
1438 redirect_stdout,
1439 redirect_stderr,
1440 } => context
1441 .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1442 .map(EngineCallResponse::PipelineData),
1443 }
1444}
1445
1446fn set_foreground(
1448 process: Option<&PluginProcess>,
1449 context: &mut dyn PluginExecutionContext,
1450 enter: bool,
1451) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1452 if let Some(process) = process {
1453 if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1454 if enter {
1455 let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1456 Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1457 EngineCallResponse::value(Value::int(id as i64, context.span()))
1458 }))
1459 } else {
1460 process.exit_foreground()?;
1461 Ok(EngineCallResponse::empty())
1462 }
1463 } else {
1464 Err(ShellError::NushellFailed {
1466 msg: "missing required pipeline_externals_state from context \
1467 for entering foreground"
1468 .into(),
1469 })
1470 }
1471 } else {
1472 Err(ShellError::Generic(
1473 GenericError::new(
1474 "Can't manage plugin process to enter foreground",
1475 "the process ID for this plugin is unknown",
1476 context.span(),
1477 )
1478 .with_help("the plugin may be running in a test"),
1479 ))
1480 }
1481}