1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10 PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11 PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14 CustomValue, IntoSpanned, PipelineData, PluginMetadata, PluginSignature, ShellError,
15 SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing, engine::Sequence,
16};
17use nu_utils::SharedCow;
18use std::{
19 collections::{BTreeMap, btree_map},
20 path::Path,
21 sync::{Arc, OnceLock, mpsc},
22};
23
24use crate::{
25 PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
26 process::PluginProcess,
27};
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Debug)]
33enum ReceivedPluginCallMessage {
34 Response(PluginCallResponse<PipelineData>),
36
37 Error(ShellError),
39
40 EngineCall(EngineCallId, EngineCall<PipelineData>),
45}
46
47pub(crate) struct Context(Box<dyn PluginExecutionContext>);
49
50impl std::fmt::Debug for Context {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.write_str("Context")
53 }
54}
55
56impl std::ops::Deref for Context {
57 type Target = dyn PluginExecutionContext;
58
59 fn deref(&self) -> &Self::Target {
60 &*self.0
61 }
62}
63
64struct PluginInterfaceState {
66 source: Arc<PluginSource>,
68 process: Option<PluginProcess>,
70 protocol_info: Waitable<Arc<ProtocolInfo>>,
72 plugin_call_id_sequence: Sequence,
74 stream_id_sequence: Sequence,
76 plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
78 error: OnceLock<ShellError>,
80 writer: Box<dyn PluginWrite<PluginInput>>,
82}
83
84impl std::fmt::Debug for PluginInterfaceState {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("PluginInterfaceState")
87 .field("source", &self.source)
88 .field("protocol_info", &self.protocol_info)
89 .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
90 .field("stream_id_sequence", &self.stream_id_sequence)
91 .field(
92 "plugin_call_subscription_sender",
93 &self.plugin_call_subscription_sender,
94 )
95 .field("error", &self.error)
96 .finish_non_exhaustive()
97 }
98}
99
100#[derive(Debug)]
102struct PluginCallState {
103 sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
105 dont_send_response: bool,
108 signals: Signals,
110 context_rx: Option<mpsc::Receiver<Context>>,
112 span: Option<Span>,
114 keep_plugin_custom_values: (
119 mpsc::Sender<PluginCustomValueWithSource>,
120 mpsc::Receiver<PluginCustomValueWithSource>,
121 ),
122 remaining_streams_to_read: i32,
124}
125
126impl Drop for PluginCallState {
127 fn drop(&mut self) {
128 for value in self.keep_plugin_custom_values.1.try_iter() {
130 log::trace!("Dropping custom value that was kept: {value:?}");
131 drop(value);
132 }
133 }
134}
135
136#[derive(Debug)]
138pub struct PluginInterfaceManager {
139 state: Arc<PluginInterfaceState>,
141 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
143 stream_manager: StreamManager,
145 plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
147 plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
149 plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
153 gc: Option<PluginGc>,
155}
156
157impl PluginInterfaceManager {
158 pub fn new(
159 source: Arc<PluginSource>,
160 pid: Option<u32>,
161 writer: impl PluginWrite<PluginInput> + 'static,
162 ) -> PluginInterfaceManager {
163 let (subscription_tx, subscription_rx) = mpsc::channel();
164 let protocol_info_mut = WaitableMut::new();
165
166 PluginInterfaceManager {
167 state: Arc::new(PluginInterfaceState {
168 source,
169 process: pid.map(PluginProcess::new),
170 protocol_info: protocol_info_mut.reader(),
171 plugin_call_id_sequence: Sequence::default(),
172 stream_id_sequence: Sequence::default(),
173 plugin_call_subscription_sender: subscription_tx,
174 error: OnceLock::new(),
175 writer: Box::new(writer),
176 }),
177 protocol_info_mut,
178 stream_manager: StreamManager::new(),
179 plugin_call_states: BTreeMap::new(),
180 plugin_call_subscription_receiver: subscription_rx,
181 plugin_call_input_streams: BTreeMap::new(),
182 gc: None,
183 }
184 }
185
186 pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
190 self.gc = gc;
191 }
192
193 fn receive_plugin_call_subscriptions(&mut self) {
195 while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
196 if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
197 e.insert(state);
198 } else {
199 log::warn!("Duplicate plugin call ID ignored: {id}");
200 }
201 }
202 }
203
204 fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
206 self.plugin_call_input_streams.insert(stream_id, call_id);
207 self.receive_plugin_call_subscriptions();
209 if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
210 state.remaining_streams_to_read += 1;
211 }
212 if let Some(ref gc) = self.gc {
214 gc.increment_locks(1);
215 }
216 }
217
218 fn recv_stream_ended(&mut self, stream_id: StreamId) {
220 if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
221 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
222 e.get_mut().remaining_streams_to_read -= 1;
223 if e.get().remaining_streams_to_read <= 0 {
225 e.remove();
226 }
227 }
228 if let Some(ref gc) = self.gc {
231 gc.decrement_locks(1);
232 }
233 }
234 }
235
236 fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
238 self.receive_plugin_call_subscriptions();
240 self.plugin_call_states
242 .get(&id)
243 .map(|state| state.signals.clone())
244 .ok_or_else(|| ShellError::PluginFailedToDecode {
245 msg: format!("Unknown plugin call ID: {id}"),
246 })
247 }
248
249 fn send_plugin_call_response(
251 &mut self,
252 id: PluginCallId,
253 response: PluginCallResponse<PipelineData>,
254 ) -> Result<(), ShellError> {
255 self.receive_plugin_call_subscriptions();
257
258 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
259 if !e.get().dont_send_response
263 && e.get_mut()
264 .sender
265 .take()
266 .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
267 .is_none()
268 {
269 log::warn!("Received a plugin call response for id={id}, but the caller hung up");
270 }
271 if e.get().remaining_streams_to_read <= 0 {
273 e.remove();
274 }
275 Ok(())
276 } else {
277 Err(ShellError::PluginFailedToDecode {
278 msg: format!("Unknown plugin call ID: {id}"),
279 })
280 }
281 }
282
283 fn spawn_engine_call_handler(
286 &mut self,
287 id: PluginCallId,
288 ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
289 let interface = self.get_interface();
290
291 if let Some(state) = self.plugin_call_states.get_mut(&id) {
292 if state.sender.is_none() {
293 let (tx, rx) = mpsc::channel();
294 let context_rx =
295 state
296 .context_rx
297 .take()
298 .ok_or_else(|| ShellError::NushellFailed {
299 msg: "Tried to spawn the fallback engine call handler more than once"
300 .into(),
301 })?;
302
303 let mut current_call_state = CurrentCallState {
305 context_tx: None,
306 keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
307 entered_foreground: false,
308 span: state.span,
309 };
310
311 let handler = move || {
312 let mut context = context_rx
314 .recv()
315 .ok() .map(|c| c.0);
317
318 for msg in rx {
319 match msg {
321 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
322 if let Err(err) = interface.handle_engine_call(
323 engine_call_id,
324 engine_call,
325 &mut current_call_state,
326 context.as_deref_mut(),
327 ) {
328 log::warn!(
329 "Error in plugin post-response engine call handler: \
330 {err:?}"
331 );
332 return;
333 }
334 }
335 other => log::warn!(
336 "Bad message received in plugin post-response \
337 engine call handler: {other:?}"
338 ),
339 }
340 }
341 };
342 std::thread::Builder::new()
343 .name("plugin engine call handler".into())
344 .spawn(handler)
345 .expect("failed to spawn thread");
346 state.sender = Some(tx);
347 Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
348 } else {
349 Err(ShellError::NushellFailed {
350 msg: "Tried to spawn the fallback engine call handler before the plugin call \
351 response had been received"
352 .into(),
353 })
354 }
355 } else {
356 Err(ShellError::NushellFailed {
357 msg: format!("Couldn't find plugin ID={id} in subscriptions"),
358 })
359 }
360 }
361
362 fn send_engine_call(
364 &mut self,
365 plugin_call_id: PluginCallId,
366 engine_call_id: EngineCallId,
367 call: EngineCall<PipelineData>,
368 ) -> Result<(), ShellError> {
369 self.receive_plugin_call_subscriptions();
371
372 if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
374 let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
375 let send_error = |this: &Self| {
377 log::warn!(
378 "Received an engine call for plugin_call_id={plugin_call_id}, \
379 but the caller hung up"
380 );
381 this.state.writer.write(&PluginInput::EngineCallResponse(
384 engine_call_id,
385 EngineCallResponse::Error(ShellError::GenericError {
386 error: "Caller hung up".to_string(),
387 msg: "Can't make engine call because the original caller hung up"
388 .to_string(),
389 span: None,
390 help: None,
391 inner: vec![],
392 }),
393 ))?;
394 this.state.writer.flush()
395 };
396 if let Some(sender) = subscription.sender.as_ref() {
398 sender.send(msg).or_else(|_| send_error(self))
399 } else {
400 let sender = self.spawn_engine_call_handler(plugin_call_id)?;
402 sender.send(msg).or_else(|_| send_error(self))
403 }
404 } else {
405 Err(ShellError::PluginFailedToDecode {
406 msg: format!("Unknown plugin call ID: {plugin_call_id}"),
407 })
408 }
409 }
410
411 pub fn is_finished(&self) -> bool {
414 Arc::strong_count(&self.state) < 2
415 }
416
417 pub fn consume_all(
421 &mut self,
422 mut reader: impl PluginRead<PluginOutput>,
423 ) -> Result<(), ShellError> {
424 let mut result = Ok(());
425
426 while let Some(msg) = reader.read().transpose() {
427 if self.is_finished() {
428 break;
429 }
430
431 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
433 let _ = self.state.error.set(err.clone());
435 let _ = self.stream_manager.broadcast_read_error(err.clone());
437 self.receive_plugin_call_subscriptions();
439 for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
440 let _ = subscription
441 .sender
442 .as_ref()
443 .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
444 }
445 result = Err(err);
446 break;
447 }
448 }
449
450 if let Some(ref gc) = self.gc {
452 gc.exited();
453 }
454 result
455 }
456}
457
458impl InterfaceManager for PluginInterfaceManager {
459 type Interface = PluginInterface;
460 type Input = PluginOutput;
461
462 fn get_interface(&self) -> Self::Interface {
463 PluginInterface {
464 state: self.state.clone(),
465 stream_manager_handle: self.stream_manager.get_handle(),
466 gc: self.gc.clone(),
467 }
468 }
469
470 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
471 log::trace!("from plugin: {input:?}");
472
473 match input {
474 PluginOutput::Hello(info) => {
475 let info = Arc::new(info);
476 self.protocol_info_mut.set(info.clone())?;
477
478 let local_info = ProtocolInfo::default();
479 if local_info.is_compatible_with(&info)? {
480 Ok(())
481 } else {
482 Err(ShellError::PluginFailedToLoad {
483 msg: format!(
484 "Plugin `{}` is compiled for nushell version {}, \
485 which is not compatible with version {}",
486 self.state.source.name(),
487 info.version,
488 local_info.version,
489 ),
490 })
491 }
492 }
493 _ if !self.state.protocol_info.is_set() => {
494 Err(ShellError::PluginFailedToLoad {
496 msg: format!(
497 "Failed to receive initial Hello message from `{}`. \
498 This plugin might be too old",
499 self.state.source.name()
500 ),
501 })
502 }
503 PluginOutput::Data(..)
505 | PluginOutput::End(..)
506 | PluginOutput::Drop(..)
507 | PluginOutput::Ack(..) => {
508 self.consume_stream_message(input.try_into().map_err(|msg| {
509 ShellError::NushellFailed {
510 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
511 }
512 })?)
513 }
514 PluginOutput::Option(option) => match option {
515 PluginOption::GcDisabled(disabled) => {
516 if let Some(ref gc) = self.gc {
518 gc.set_disabled(disabled);
519 }
520 Ok(())
521 }
522 },
523 PluginOutput::CallResponse(id, response) => {
524 let response = response
526 .map_data(|data| {
527 let signals = self.get_signals(id)?;
528
529 if let Some(stream_id) = data.stream_id() {
531 self.recv_stream_started(id, stream_id);
532 }
533
534 self.read_pipeline_data(data, &signals)
535 })
536 .unwrap_or_else(|err| {
537 PluginCallResponse::Error(err.into())
540 });
541 let result = self.send_plugin_call_response(id, response);
542 if result.is_ok() {
543 if let Some(ref gc) = self.gc {
545 gc.decrement_locks(1);
546 }
547 }
548 result
549 }
550 PluginOutput::EngineCall { context, id, call } => {
551 let call = call
552 .map_data(|input| {
554 let signals = self.get_signals(context)?;
555 self.read_pipeline_data(input, &signals)
556 })
557 .and_then(|mut engine_call| {
559 match engine_call {
560 EngineCall::EvalClosure {
561 ref mut positional, ..
562 } => {
563 for arg in positional.iter_mut() {
564 PluginCustomValueWithSource::add_source_in(
566 arg,
567 &self.state.source,
568 )?;
569 }
570 Ok(engine_call)
571 }
572 _ => Ok(engine_call),
573 }
574 });
575 match call {
576 Ok(call) => self.send_engine_call(context, id, call),
577 Err(err) => self.get_interface().write_engine_call_response(
579 id,
580 EngineCallResponse::Error(err),
581 &CurrentCallState::default(),
582 ),
583 }
584 }
585 }
586 }
587
588 fn stream_manager(&self) -> &StreamManager {
589 &self.stream_manager
590 }
591
592 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
593 match data {
595 PipelineData::Value(ref mut value, _) => {
596 with_custom_values_in(value, |custom_value| {
597 PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
598 Ok::<_, ShellError>(())
599 })?;
600 Ok(data)
601 }
602 PipelineData::ListStream(stream, meta) => {
603 let source = self.state.source.clone();
604 Ok(PipelineData::list_stream(
605 stream.map(move |mut value| {
606 let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
607 value
608 }),
609 meta,
610 ))
611 }
612 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
613 }
614 }
615
616 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
617 if let StreamMessage::End(id) = message {
619 self.recv_stream_ended(id);
620 }
621 self.stream_manager.handle_message(message)
622 }
623}
624
625#[derive(Debug, Clone)]
629#[doc(hidden)]
630pub struct PluginInterface {
631 state: Arc<PluginInterfaceState>,
633 stream_manager_handle: StreamManagerHandle,
635 gc: Option<PluginGc>,
637}
638
639impl PluginInterface {
640 pub fn pid(&self) -> Option<u32> {
642 self.state.process.as_ref().map(|p| p.pid())
643 }
644
645 pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
647 self.state.protocol_info.get().and_then(|info| {
648 info.ok_or_else(|| ShellError::PluginFailedToLoad {
649 msg: format!(
650 "Failed to get protocol info (`Hello` message) from the `{}` plugin",
651 self.state.source.identity.name()
652 ),
653 })
654 })
655 }
656
657 pub fn hello(&self) -> Result<(), ShellError> {
659 self.write(PluginInput::Hello(ProtocolInfo::default()))?;
660 self.flush()
661 }
662
663 pub fn goodbye(&self) -> Result<(), ShellError> {
669 self.write(PluginInput::Goodbye)?;
670 self.flush()
671 }
672
673 pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
675 self.write(PluginInput::Signal(action))?;
676 self.flush()
677 }
678
679 pub fn write_engine_call_response(
682 &self,
683 id: EngineCallId,
684 response: EngineCallResponse<PipelineData>,
685 state: &CurrentCallState,
686 ) -> Result<(), ShellError> {
687 let mut writer = None;
689 let response = response.map_data(|data| {
690 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
691 writer = Some(data_writer);
692 Ok(data_header)
693 })?;
694
695 self.write(PluginInput::EngineCallResponse(id, response))?;
697 self.flush()?;
698
699 if let Some(writer) = writer {
701 writer.write_background()?;
702 }
703
704 Ok(())
705 }
706
707 fn write_plugin_call(
709 &self,
710 mut call: PluginCall<PipelineData>,
711 context: Option<&dyn PluginExecutionContext>,
712 ) -> Result<WritePluginCallResult, ShellError> {
713 let id = self.state.plugin_call_id_sequence.next()?;
714 let signals = context
715 .map(|c| c.signals().clone())
716 .unwrap_or_else(Signals::empty);
717 let (tx, rx) = mpsc::channel();
718 let (context_tx, context_rx) = mpsc::channel();
719 let keep_plugin_custom_values = mpsc::channel();
720
721 let state = CurrentCallState {
723 context_tx: Some(context_tx),
724 keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
725 entered_foreground: false,
726 span: call.span(),
727 };
728
729 state.prepare_plugin_call(&mut call, &self.state.source)?;
731
732 let (call, writer) = match call {
734 PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
735 PluginCall::Signature => (PluginCall::Signature, Default::default()),
736 PluginCall::CustomValueOp(value, op) => {
737 (PluginCall::CustomValueOp(value, op), Default::default())
738 }
739 PluginCall::Run(CallInfo { name, call, input }) => {
740 let (header, writer) = self.init_write_pipeline_data(input, &state)?;
741 (
742 PluginCall::Run(CallInfo {
743 name,
744 call,
745 input: header,
746 }),
747 writer,
748 )
749 }
750 };
751
752 let dont_send_response =
754 matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
755
756 self.state
758 .plugin_call_subscription_sender
759 .send((
760 id,
761 PluginCallState {
762 sender: Some(tx).filter(|_| !dont_send_response),
763 dont_send_response,
764 signals,
765 context_rx: Some(context_rx),
766 span: call.span(),
767 keep_plugin_custom_values,
768 remaining_streams_to_read: 0,
769 },
770 ))
771 .map_err(|_| {
772 let existing_error = self.state.error.get().cloned();
773 ShellError::GenericError {
774 error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
775 msg: "can't complete this operation because the plugin is closed".into(),
776 span: call.span(),
777 help: Some(format!(
778 "the plugin may have experienced an error. Try loading the plugin again \
779 with `{}`",
780 self.state.source.identity.use_command(),
781 )),
782 inner: existing_error.into_iter().collect(),
783 }
784 })?;
785
786 if let Some(ref gc) = self.gc {
790 gc.increment_locks(1);
791 }
792
793 self.write(PluginInput::Call(id, call))?;
795 self.flush()?;
796
797 Ok(WritePluginCallResult {
798 receiver: rx,
799 writer,
800 state,
801 })
802 }
803
804 fn receive_plugin_call_response(
806 &self,
807 rx: mpsc::Receiver<ReceivedPluginCallMessage>,
808 mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
809 mut state: CurrentCallState,
810 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
811 for msg in rx {
813 match msg {
814 ReceivedPluginCallMessage::Response(resp) => {
815 if state.entered_foreground {
816 if let Some(context) = context.as_deref_mut()
818 && let Err(err) =
819 set_foreground(self.state.process.as_ref(), context, false)
820 {
821 log::warn!("Failed to leave foreground state on exit: {err:?}");
822 }
823 }
824 if resp.has_stream() {
825 if let Some(context) = context
827 && let Some(ref context_tx) = state.context_tx
828 {
829 let _ = context_tx.send(Context(context.boxed()));
830 }
831 }
832 return Ok(resp);
833 }
834 ReceivedPluginCallMessage::Error(err) => {
835 return Err(err);
836 }
837 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
838 self.handle_engine_call(
839 engine_call_id,
840 engine_call,
841 &mut state,
842 context.as_deref_mut(),
843 )?;
844 }
845 }
846 }
847 let existing_error = self.state.error.get().cloned();
850 Err(ShellError::GenericError {
851 error: format!(
852 "Failed to receive response to plugin call from `{}`",
853 self.state.source.identity.name()
854 ),
855 msg: "while waiting for this operation to complete".into(),
856 span: state.span,
857 help: Some(format!(
858 "try restarting the plugin with `{}`",
859 self.state.source.identity.use_command()
860 )),
861 inner: existing_error.into_iter().collect(),
862 })
863 }
864
865 fn handle_engine_call(
867 &self,
868 engine_call_id: EngineCallId,
869 engine_call: EngineCall<PipelineData>,
870 state: &mut CurrentCallState,
871 context: Option<&mut (dyn PluginExecutionContext + '_)>,
872 ) -> Result<(), ShellError> {
873 let process = self.state.process.as_ref();
874 let resp = handle_engine_call(engine_call, state, context, process)
875 .unwrap_or_else(EngineCallResponse::Error);
876 let mut writer = None;
878 let resp = resp
879 .map_data(|data| {
880 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
881 writer = Some(data_writer);
882 Ok(data_header)
883 })
884 .unwrap_or_else(|err| {
885 writer = None;
887 EngineCallResponse::Error(err)
888 });
889 self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
891 self.flush()?;
892 if let Some(writer) = writer {
893 writer.write_background()?;
894 }
895 Ok(())
896 }
897
898 fn plugin_call(
901 &self,
902 call: PluginCall<PipelineData>,
903 context: Option<&mut dyn PluginExecutionContext>,
904 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
905 if let Some(error) = self.state.error.get() {
907 return Err(ShellError::GenericError {
908 error: format!(
909 "Failed to send plugin call to `{}`",
910 self.state.source.identity.name()
911 ),
912 msg: "the plugin encountered an error before this operation could be attempted"
913 .into(),
914 span: call.span(),
915 help: Some(format!(
916 "try loading the plugin again with `{}`",
917 self.state.source.identity.use_command(),
918 )),
919 inner: vec![error.clone()],
920 });
921 }
922
923 let result = self.write_plugin_call(call, context.as_deref())?;
924
925 result.writer.write_background()?;
927
928 self.receive_plugin_call_response(result.receiver, context, result.state)
929 }
930
931 pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
933 match self.plugin_call(PluginCall::Metadata, None)? {
934 PluginCallResponse::Metadata(meta) => Ok(meta),
935 PluginCallResponse::Error(err) => Err(err.into()),
936 _ => Err(ShellError::PluginFailedToDecode {
937 msg: "Received unexpected response to plugin Metadata call".into(),
938 }),
939 }
940 }
941
942 pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
944 match self.plugin_call(PluginCall::Signature, None)? {
945 PluginCallResponse::Signature(sigs) => Ok(sigs),
946 PluginCallResponse::Error(err) => Err(err.into()),
947 _ => Err(ShellError::PluginFailedToDecode {
948 msg: "Received unexpected response to plugin Signature call".into(),
949 }),
950 }
951 }
952
953 pub fn run(
955 &self,
956 call: CallInfo<PipelineData>,
957 context: &mut dyn PluginExecutionContext,
958 ) -> Result<PipelineData, ShellError> {
959 match self.plugin_call(PluginCall::Run(call), Some(context))? {
960 PluginCallResponse::PipelineData(data) => Ok(data),
961 PluginCallResponse::Error(err) => Err(err.into()),
962 _ => Err(ShellError::PluginFailedToDecode {
963 msg: "Received unexpected response to plugin Run call".into(),
964 }),
965 }
966 }
967
968 fn custom_value_op_expecting_value(
970 &self,
971 value: Spanned<PluginCustomValueWithSource>,
972 op: CustomValueOp,
973 ) -> Result<Value, ShellError> {
974 let op_name = op.name();
975 let span = value.span;
976
977 value.item.verify_source(span, &self.state.source)?;
979
980 let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
981 match self.plugin_call(call, None)? {
982 PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
983 PluginCallResponse::Error(err) => Err(err.into()),
984 _ => Err(ShellError::PluginFailedToDecode {
985 msg: format!("Received unexpected response to custom value {op_name}() call"),
986 }),
987 }
988 }
989
990 pub fn custom_value_to_base_value(
992 &self,
993 value: Spanned<PluginCustomValueWithSource>,
994 ) -> Result<Value, ShellError> {
995 self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
996 }
997
998 pub fn custom_value_follow_path_int(
1000 &self,
1001 value: Spanned<PluginCustomValueWithSource>,
1002 index: Spanned<usize>,
1003 optional: bool,
1004 ) -> Result<Value, ShellError> {
1005 self.custom_value_op_expecting_value(
1006 value,
1007 CustomValueOp::FollowPathInt { index, optional },
1008 )
1009 }
1010
1011 pub fn custom_value_follow_path_string(
1013 &self,
1014 value: Spanned<PluginCustomValueWithSource>,
1015 column_name: Spanned<String>,
1016 optional: bool,
1017 casing: Casing,
1018 ) -> Result<Value, ShellError> {
1019 self.custom_value_op_expecting_value(
1020 value,
1021 CustomValueOp::FollowPathString {
1022 column_name,
1023 optional,
1024 casing,
1025 },
1026 )
1027 }
1028
1029 pub fn custom_value_partial_cmp(
1031 &self,
1032 value: PluginCustomValueWithSource,
1033 other_value: Value,
1034 ) -> Result<Option<Ordering>, ShellError> {
1035 value.verify_source(Span::unknown(), &self.state.source)?;
1037
1038 let call = PluginCall::CustomValueOp(
1041 value.without_source().into_spanned(Span::unknown()),
1042 CustomValueOp::PartialCmp(other_value),
1043 );
1044 match self.plugin_call(call, None)? {
1045 PluginCallResponse::Ordering(ordering) => Ok(ordering),
1046 PluginCallResponse::Error(err) => Err(err.into()),
1047 _ => Err(ShellError::PluginFailedToDecode {
1048 msg: "Received unexpected response to custom value partial_cmp() call".into(),
1049 }),
1050 }
1051 }
1052
1053 pub fn custom_value_operation(
1055 &self,
1056 left: Spanned<PluginCustomValueWithSource>,
1057 operator: Spanned<Operator>,
1058 right: Value,
1059 ) -> Result<Value, ShellError> {
1060 self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1061 }
1062
1063 pub fn custom_value_save(
1065 &self,
1066 value: Spanned<PluginCustomValueWithSource>,
1067 path: Spanned<&Path>,
1068 save_call_span: Span,
1069 ) -> Result<(), ShellError> {
1070 value.item.verify_source(value.span, &self.state.source)?;
1072
1073 let call = PluginCall::CustomValueOp(
1074 value.map(|cv| cv.without_source()),
1075 CustomValueOp::Save {
1076 path: path.map(ToOwned::to_owned),
1077 save_call_span,
1078 },
1079 );
1080 match self.plugin_call(call, None)? {
1081 PluginCallResponse::Ok => Ok(()),
1082 PluginCallResponse::Error(err) => Err(err.into()),
1083 _ => Err(ShellError::PluginFailedToDecode {
1084 msg: "Received unexpected response to custom value save() call".into(),
1085 }),
1086 }
1087 }
1088
1089 pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1091 drop(self.write_plugin_call(
1096 PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1097 None,
1098 )?);
1099 Ok(())
1100 }
1101}
1102
1103impl Interface for PluginInterface {
1104 type Output = PluginInput;
1105 type DataContext = CurrentCallState;
1106
1107 fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1108 log::trace!("to plugin: {input:?}");
1109 self.state.writer.write(&input).map_err(|err| {
1110 log::warn!("write() error: {err}");
1111 self.state.error.get().cloned().unwrap_or(err)
1114 })
1115 }
1116
1117 fn flush(&self) -> Result<(), ShellError> {
1118 self.state.writer.flush().map_err(|err| {
1119 log::warn!("flush() error: {err}");
1120 self.state.error.get().cloned().unwrap_or(err)
1123 })
1124 }
1125
1126 fn stream_id_sequence(&self) -> &Sequence {
1127 &self.state.stream_id_sequence
1128 }
1129
1130 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1131 &self.stream_manager_handle
1132 }
1133
1134 fn prepare_pipeline_data(
1135 &self,
1136 data: PipelineData,
1137 state: &CurrentCallState,
1138 ) -> Result<PipelineData, ShellError> {
1139 match data {
1141 PipelineData::Value(mut value, meta) => {
1142 state.prepare_value(&mut value, &self.state.source)?;
1143 Ok(PipelineData::value(value, meta))
1144 }
1145 PipelineData::ListStream(stream, meta) => {
1146 let source = self.state.source.clone();
1147 let state = state.clone();
1148 Ok(PipelineData::list_stream(
1149 stream.map(move |mut value| {
1150 match state.prepare_value(&mut value, &source) {
1151 Ok(()) => value,
1152 Err(err) => Value::error(err, value.span()),
1154 }
1155 }),
1156 meta,
1157 ))
1158 }
1159 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1160 }
1161 }
1162}
1163
1164impl Drop for PluginInterface {
1165 fn drop(&mut self) {
1166 if Arc::strong_count(&self.state) < 3
1172 && let Err(err) = self.goodbye()
1173 {
1174 log::warn!("Error during plugin Goodbye: {err}");
1175 }
1176 }
1177}
1178
1179#[must_use]
1181struct WritePluginCallResult {
1182 receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1184 writer: PipelineDataWriter<PluginInterface>,
1186 state: CurrentCallState,
1188}
1189
1190#[derive(Default, Clone)]
1192pub struct CurrentCallState {
1193 context_tx: Option<mpsc::Sender<Context>>,
1196 keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1199 entered_foreground: bool,
1202 span: Option<Span>,
1204}
1205
1206impl CurrentCallState {
1207 fn prepare_custom_value(
1210 &self,
1211 custom_value: Spanned<&mut Box<dyn CustomValue>>,
1212 source: &PluginSource,
1213 ) -> Result<(), ShellError> {
1214 PluginCustomValueWithSource::verify_source_of_custom_value(
1216 custom_value.as_deref().map(|cv| &**cv),
1217 source,
1218 )?;
1219
1220 if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1222 && let Some(custom_value) = custom_value
1223 .item
1224 .as_any()
1225 .downcast_ref::<PluginCustomValueWithSource>()
1226 && custom_value.notify_on_drop()
1227 {
1228 log::trace!("Keeping custom value for drop later: {custom_value:?}");
1229 keep_tx
1230 .send(custom_value.clone())
1231 .map_err(|_| ShellError::NushellFailed {
1232 msg: "Failed to custom value to keep channel".into(),
1233 })?;
1234 }
1235
1236 PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1238
1239 Ok(())
1240 }
1241
1242 fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1244 with_custom_values_in(value, |custom_value| {
1245 self.prepare_custom_value(custom_value, source)
1246 })
1247 }
1248
1249 fn prepare_call_args(
1251 &self,
1252 call: &mut EvaluatedCall,
1253 source: &PluginSource,
1254 ) -> Result<(), ShellError> {
1255 for arg in call.positional.iter_mut() {
1256 self.prepare_value(arg, source)?;
1257 }
1258 for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1259 self.prepare_value(arg, source)?;
1260 }
1261 Ok(())
1262 }
1263
1264 fn prepare_plugin_call<D>(
1267 &self,
1268 call: &mut PluginCall<D>,
1269 source: &PluginSource,
1270 ) -> Result<(), ShellError> {
1271 match call {
1272 PluginCall::Metadata => Ok(()),
1273 PluginCall::Signature => Ok(()),
1274 PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1275 PluginCall::CustomValueOp(_, op) => {
1276 match op {
1278 CustomValueOp::ToBaseValue => Ok(()),
1279 CustomValueOp::FollowPathInt { .. } => Ok(()),
1280 CustomValueOp::FollowPathString { .. } => Ok(()),
1281 CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1282 CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1283 CustomValueOp::Save { .. } => Ok(()),
1284 CustomValueOp::Dropped => Ok(()),
1285 }
1286 }
1287 }
1288 }
1289}
1290
1291pub(crate) fn handle_engine_call(
1293 call: EngineCall<PipelineData>,
1294 state: &mut CurrentCallState,
1295 context: Option<&mut (dyn PluginExecutionContext + '_)>,
1296 process: Option<&PluginProcess>,
1297) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1298 let call_name = call.name();
1299
1300 let context = context.ok_or_else(|| ShellError::GenericError {
1301 error: "A plugin execution context is required for this engine call".into(),
1302 msg: format!("attempted to call {call_name} outside of a command invocation"),
1303 span: None,
1304 help: Some("this is probably a bug with the plugin".into()),
1305 inner: vec![],
1306 })?;
1307
1308 match call {
1309 EngineCall::GetConfig => {
1310 let config = SharedCow::from(context.get_config()?);
1311 Ok(EngineCallResponse::Config(config))
1312 }
1313 EngineCall::GetPluginConfig => {
1314 let plugin_config = context.get_plugin_config()?;
1315 Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1316 }
1317 EngineCall::GetEnvVar(name) => {
1318 let value = context.get_env_var(&name)?;
1319 Ok(value
1320 .cloned()
1321 .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1322 }
1323 EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1324 EngineCall::GetCurrentDir => {
1325 let current_dir = context.get_current_dir()?;
1326 Ok(EngineCallResponse::value(Value::string(
1327 current_dir.item,
1328 current_dir.span,
1329 )))
1330 }
1331 EngineCall::AddEnvVar(name, value) => {
1332 context.add_env_var(name, value)?;
1333 Ok(EngineCallResponse::empty())
1334 }
1335 EngineCall::GetHelp => {
1336 let help = context.get_help()?;
1337 Ok(EngineCallResponse::value(Value::string(
1338 help.item, help.span,
1339 )))
1340 }
1341 EngineCall::EnterForeground => {
1342 let resp = set_foreground(process, context, true)?;
1343 state.entered_foreground = true;
1344 Ok(resp)
1345 }
1346 EngineCall::LeaveForeground => {
1347 let resp = set_foreground(process, context, false)?;
1348 state.entered_foreground = false;
1349 Ok(resp)
1350 }
1351 EngineCall::GetSpanContents(span) => {
1352 let contents = context.get_span_contents(span)?;
1353 Ok(EngineCallResponse::value(Value::binary(
1354 contents.item,
1355 contents.span,
1356 )))
1357 }
1358 EngineCall::EvalClosure {
1359 closure,
1360 positional,
1361 input,
1362 redirect_stdout,
1363 redirect_stderr,
1364 } => context
1365 .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1366 .map(EngineCallResponse::PipelineData),
1367 EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1368 if let Some(decl_id) = decl_id {
1369 EngineCallResponse::Identifier(decl_id)
1370 } else {
1371 EngineCallResponse::empty()
1372 }
1373 }),
1374 EngineCall::CallDecl {
1375 decl_id,
1376 call,
1377 input,
1378 redirect_stdout,
1379 redirect_stderr,
1380 } => context
1381 .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1382 .map(EngineCallResponse::PipelineData),
1383 }
1384}
1385
1386fn set_foreground(
1388 process: Option<&PluginProcess>,
1389 context: &mut dyn PluginExecutionContext,
1390 enter: bool,
1391) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1392 if let Some(process) = process {
1393 if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1394 if enter {
1395 let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1396 Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1397 EngineCallResponse::value(Value::int(id as i64, context.span()))
1398 }))
1399 } else {
1400 process.exit_foreground()?;
1401 Ok(EngineCallResponse::empty())
1402 }
1403 } else {
1404 Err(ShellError::NushellFailed {
1406 msg: "missing required pipeline_externals_state from context \
1407 for entering foreground"
1408 .into(),
1409 })
1410 }
1411 } else {
1412 Err(ShellError::GenericError {
1413 error: "Can't manage plugin process to enter foreground".into(),
1414 msg: "the process ID for this plugin is unknown".into(),
1415 span: Some(context.span()),
1416 help: Some("the plugin may be running in a test".into()),
1417 inner: vec![],
1418 })
1419 }
1420}