1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10 PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11 PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14 CustomValue, IntoSpanned, PipelineData, PluginMetadata, PluginSignature, ShellError,
15 SignalAction, Signals, Span, Spanned, Value, ast::Operator, engine::Sequence,
16};
17use nu_utils::SharedCow;
18use std::{
19 collections::{BTreeMap, btree_map},
20 sync::{Arc, OnceLock, mpsc},
21};
22
23use crate::{
24 PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
25 process::PluginProcess,
26};
27
28#[cfg(test)]
29mod tests;
30
31#[derive(Debug)]
32enum ReceivedPluginCallMessage {
33 Response(PluginCallResponse<PipelineData>),
35
36 Error(ShellError),
38
39 EngineCall(EngineCallId, EngineCall<PipelineData>),
44}
45
46pub(crate) struct Context(Box<dyn PluginExecutionContext>);
48
49impl std::fmt::Debug for Context {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.write_str("Context")
52 }
53}
54
55impl std::ops::Deref for Context {
56 type Target = dyn PluginExecutionContext;
57
58 fn deref(&self) -> &Self::Target {
59 &*self.0
60 }
61}
62
63struct PluginInterfaceState {
65 source: Arc<PluginSource>,
67 process: Option<PluginProcess>,
69 protocol_info: Waitable<Arc<ProtocolInfo>>,
71 plugin_call_id_sequence: Sequence,
73 stream_id_sequence: Sequence,
75 plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
77 error: OnceLock<ShellError>,
79 writer: Box<dyn PluginWrite<PluginInput>>,
81}
82
83impl std::fmt::Debug for PluginInterfaceState {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("PluginInterfaceState")
86 .field("source", &self.source)
87 .field("protocol_info", &self.protocol_info)
88 .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
89 .field("stream_id_sequence", &self.stream_id_sequence)
90 .field(
91 "plugin_call_subscription_sender",
92 &self.plugin_call_subscription_sender,
93 )
94 .field("error", &self.error)
95 .finish_non_exhaustive()
96 }
97}
98
99#[derive(Debug)]
101struct PluginCallState {
102 sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
104 dont_send_response: bool,
107 signals: Signals,
109 context_rx: Option<mpsc::Receiver<Context>>,
111 span: Option<Span>,
113 keep_plugin_custom_values: (
118 mpsc::Sender<PluginCustomValueWithSource>,
119 mpsc::Receiver<PluginCustomValueWithSource>,
120 ),
121 remaining_streams_to_read: i32,
123}
124
125impl Drop for PluginCallState {
126 fn drop(&mut self) {
127 for value in self.keep_plugin_custom_values.1.try_iter() {
129 log::trace!("Dropping custom value that was kept: {value:?}");
130 drop(value);
131 }
132 }
133}
134
135#[derive(Debug)]
137pub struct PluginInterfaceManager {
138 state: Arc<PluginInterfaceState>,
140 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
142 stream_manager: StreamManager,
144 plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
146 plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
148 plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
152 gc: Option<PluginGc>,
154}
155
156impl PluginInterfaceManager {
157 pub fn new(
158 source: Arc<PluginSource>,
159 pid: Option<u32>,
160 writer: impl PluginWrite<PluginInput> + 'static,
161 ) -> PluginInterfaceManager {
162 let (subscription_tx, subscription_rx) = mpsc::channel();
163 let protocol_info_mut = WaitableMut::new();
164
165 PluginInterfaceManager {
166 state: Arc::new(PluginInterfaceState {
167 source,
168 process: pid.map(PluginProcess::new),
169 protocol_info: protocol_info_mut.reader(),
170 plugin_call_id_sequence: Sequence::default(),
171 stream_id_sequence: Sequence::default(),
172 plugin_call_subscription_sender: subscription_tx,
173 error: OnceLock::new(),
174 writer: Box::new(writer),
175 }),
176 protocol_info_mut,
177 stream_manager: StreamManager::new(),
178 plugin_call_states: BTreeMap::new(),
179 plugin_call_subscription_receiver: subscription_rx,
180 plugin_call_input_streams: BTreeMap::new(),
181 gc: None,
182 }
183 }
184
185 pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
189 self.gc = gc;
190 }
191
192 fn receive_plugin_call_subscriptions(&mut self) {
194 while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
195 if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
196 e.insert(state);
197 } else {
198 log::warn!("Duplicate plugin call ID ignored: {id}");
199 }
200 }
201 }
202
203 fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
205 self.plugin_call_input_streams.insert(stream_id, call_id);
206 self.receive_plugin_call_subscriptions();
208 if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
209 state.remaining_streams_to_read += 1;
210 }
211 if let Some(ref gc) = self.gc {
213 gc.increment_locks(1);
214 }
215 }
216
217 fn recv_stream_ended(&mut self, stream_id: StreamId) {
219 if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
220 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
221 e.get_mut().remaining_streams_to_read -= 1;
222 if e.get().remaining_streams_to_read <= 0 {
224 e.remove();
225 }
226 }
227 if let Some(ref gc) = self.gc {
230 gc.decrement_locks(1);
231 }
232 }
233 }
234
235 fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
237 self.receive_plugin_call_subscriptions();
239 self.plugin_call_states
241 .get(&id)
242 .map(|state| state.signals.clone())
243 .ok_or_else(|| ShellError::PluginFailedToDecode {
244 msg: format!("Unknown plugin call ID: {id}"),
245 })
246 }
247
248 fn send_plugin_call_response(
250 &mut self,
251 id: PluginCallId,
252 response: PluginCallResponse<PipelineData>,
253 ) -> Result<(), ShellError> {
254 self.receive_plugin_call_subscriptions();
256
257 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
258 if !e.get().dont_send_response
262 && e.get_mut()
263 .sender
264 .take()
265 .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
266 .is_none()
267 {
268 log::warn!("Received a plugin call response for id={id}, but the caller hung up");
269 }
270 if e.get().remaining_streams_to_read <= 0 {
272 e.remove();
273 }
274 Ok(())
275 } else {
276 Err(ShellError::PluginFailedToDecode {
277 msg: format!("Unknown plugin call ID: {id}"),
278 })
279 }
280 }
281
282 fn spawn_engine_call_handler(
285 &mut self,
286 id: PluginCallId,
287 ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
288 let interface = self.get_interface();
289
290 if let Some(state) = self.plugin_call_states.get_mut(&id) {
291 if state.sender.is_none() {
292 let (tx, rx) = mpsc::channel();
293 let context_rx =
294 state
295 .context_rx
296 .take()
297 .ok_or_else(|| ShellError::NushellFailed {
298 msg: "Tried to spawn the fallback engine call handler more than once"
299 .into(),
300 })?;
301
302 let mut current_call_state = CurrentCallState {
304 context_tx: None,
305 keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
306 entered_foreground: false,
307 span: state.span,
308 };
309
310 let handler = move || {
311 let mut context = context_rx
313 .recv()
314 .ok() .map(|c| c.0);
316
317 for msg in rx {
318 match msg {
320 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
321 if let Err(err) = interface.handle_engine_call(
322 engine_call_id,
323 engine_call,
324 &mut current_call_state,
325 context.as_deref_mut(),
326 ) {
327 log::warn!(
328 "Error in plugin post-response engine call handler: \
329 {err:?}"
330 );
331 return;
332 }
333 }
334 other => log::warn!(
335 "Bad message received in plugin post-response \
336 engine call handler: {other:?}"
337 ),
338 }
339 }
340 };
341 std::thread::Builder::new()
342 .name("plugin engine call handler".into())
343 .spawn(handler)
344 .expect("failed to spawn thread");
345 state.sender = Some(tx);
346 Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
347 } else {
348 Err(ShellError::NushellFailed {
349 msg: "Tried to spawn the fallback engine call handler before the plugin call \
350 response had been received"
351 .into(),
352 })
353 }
354 } else {
355 Err(ShellError::NushellFailed {
356 msg: format!("Couldn't find plugin ID={id} in subscriptions"),
357 })
358 }
359 }
360
361 fn send_engine_call(
363 &mut self,
364 plugin_call_id: PluginCallId,
365 engine_call_id: EngineCallId,
366 call: EngineCall<PipelineData>,
367 ) -> Result<(), ShellError> {
368 self.receive_plugin_call_subscriptions();
370
371 if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
373 let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
374 let send_error = |this: &Self| {
376 log::warn!(
377 "Received an engine call for plugin_call_id={plugin_call_id}, \
378 but the caller hung up"
379 );
380 this.state.writer.write(&PluginInput::EngineCallResponse(
383 engine_call_id,
384 EngineCallResponse::Error(ShellError::GenericError {
385 error: "Caller hung up".to_string(),
386 msg: "Can't make engine call because the original caller hung up"
387 .to_string(),
388 span: None,
389 help: None,
390 inner: vec![],
391 }),
392 ))?;
393 this.state.writer.flush()
394 };
395 if let Some(sender) = subscription.sender.as_ref() {
397 sender.send(msg).or_else(|_| send_error(self))
398 } else {
399 let sender = self.spawn_engine_call_handler(plugin_call_id)?;
401 sender.send(msg).or_else(|_| send_error(self))
402 }
403 } else {
404 Err(ShellError::PluginFailedToDecode {
405 msg: format!("Unknown plugin call ID: {plugin_call_id}"),
406 })
407 }
408 }
409
410 pub fn is_finished(&self) -> bool {
413 Arc::strong_count(&self.state) < 2
414 }
415
416 pub fn consume_all(
420 &mut self,
421 mut reader: impl PluginRead<PluginOutput>,
422 ) -> Result<(), ShellError> {
423 let mut result = Ok(());
424
425 while let Some(msg) = reader.read().transpose() {
426 if self.is_finished() {
427 break;
428 }
429
430 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
432 let _ = self.state.error.set(err.clone());
434 let _ = self.stream_manager.broadcast_read_error(err.clone());
436 self.receive_plugin_call_subscriptions();
438 for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
439 let _ = subscription
440 .sender
441 .as_ref()
442 .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
443 }
444 result = Err(err);
445 break;
446 }
447 }
448
449 if let Some(ref gc) = self.gc {
451 gc.exited();
452 }
453 result
454 }
455}
456
457impl InterfaceManager for PluginInterfaceManager {
458 type Interface = PluginInterface;
459 type Input = PluginOutput;
460
461 fn get_interface(&self) -> Self::Interface {
462 PluginInterface {
463 state: self.state.clone(),
464 stream_manager_handle: self.stream_manager.get_handle(),
465 gc: self.gc.clone(),
466 }
467 }
468
469 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
470 log::trace!("from plugin: {input:?}");
471
472 match input {
473 PluginOutput::Hello(info) => {
474 let info = Arc::new(info);
475 self.protocol_info_mut.set(info.clone())?;
476
477 let local_info = ProtocolInfo::default();
478 if local_info.is_compatible_with(&info)? {
479 Ok(())
480 } else {
481 Err(ShellError::PluginFailedToLoad {
482 msg: format!(
483 "Plugin `{}` is compiled for nushell version {}, \
484 which is not compatible with version {}",
485 self.state.source.name(),
486 info.version,
487 local_info.version,
488 ),
489 })
490 }
491 }
492 _ if !self.state.protocol_info.is_set() => {
493 Err(ShellError::PluginFailedToLoad {
495 msg: format!(
496 "Failed to receive initial Hello message from `{}`. \
497 This plugin might be too old",
498 self.state.source.name()
499 ),
500 })
501 }
502 PluginOutput::Data(..)
504 | PluginOutput::End(..)
505 | PluginOutput::Drop(..)
506 | PluginOutput::Ack(..) => {
507 self.consume_stream_message(input.try_into().map_err(|msg| {
508 ShellError::NushellFailed {
509 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
510 }
511 })?)
512 }
513 PluginOutput::Option(option) => match option {
514 PluginOption::GcDisabled(disabled) => {
515 if let Some(ref gc) = self.gc {
517 gc.set_disabled(disabled);
518 }
519 Ok(())
520 }
521 },
522 PluginOutput::CallResponse(id, response) => {
523 let response = response
525 .map_data(|data| {
526 let signals = self.get_signals(id)?;
527
528 if let Some(stream_id) = data.stream_id() {
530 self.recv_stream_started(id, stream_id);
531 }
532
533 self.read_pipeline_data(data, &signals)
534 })
535 .unwrap_or_else(|err| {
536 PluginCallResponse::Error(err.into())
539 });
540 let result = self.send_plugin_call_response(id, response);
541 if result.is_ok() {
542 if let Some(ref gc) = self.gc {
544 gc.decrement_locks(1);
545 }
546 }
547 result
548 }
549 PluginOutput::EngineCall { context, id, call } => {
550 let call = call
551 .map_data(|input| {
553 let signals = self.get_signals(context)?;
554 self.read_pipeline_data(input, &signals)
555 })
556 .and_then(|mut engine_call| {
558 match engine_call {
559 EngineCall::EvalClosure {
560 ref mut positional, ..
561 } => {
562 for arg in positional.iter_mut() {
563 PluginCustomValueWithSource::add_source_in(
565 arg,
566 &self.state.source,
567 )?;
568 }
569 Ok(engine_call)
570 }
571 _ => Ok(engine_call),
572 }
573 });
574 match call {
575 Ok(call) => self.send_engine_call(context, id, call),
576 Err(err) => self.get_interface().write_engine_call_response(
578 id,
579 EngineCallResponse::Error(err),
580 &CurrentCallState::default(),
581 ),
582 }
583 }
584 }
585 }
586
587 fn stream_manager(&self) -> &StreamManager {
588 &self.stream_manager
589 }
590
591 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
592 match data {
594 PipelineData::Value(ref mut value, _) => {
595 with_custom_values_in(value, |custom_value| {
596 PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
597 Ok::<_, ShellError>(())
598 })?;
599 Ok(data)
600 }
601 PipelineData::ListStream(stream, meta) => {
602 let source = self.state.source.clone();
603 Ok(PipelineData::list_stream(
604 stream.map(move |mut value| {
605 let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
606 value
607 }),
608 meta,
609 ))
610 }
611 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
612 }
613 }
614
615 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
616 if let StreamMessage::End(id) = message {
618 self.recv_stream_ended(id);
619 }
620 self.stream_manager.handle_message(message)
621 }
622}
623
624#[derive(Debug, Clone)]
628#[doc(hidden)]
629pub struct PluginInterface {
630 state: Arc<PluginInterfaceState>,
632 stream_manager_handle: StreamManagerHandle,
634 gc: Option<PluginGc>,
636}
637
638impl PluginInterface {
639 pub fn pid(&self) -> Option<u32> {
641 self.state.process.as_ref().map(|p| p.pid())
642 }
643
644 pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
646 self.state.protocol_info.get().and_then(|info| {
647 info.ok_or_else(|| ShellError::PluginFailedToLoad {
648 msg: format!(
649 "Failed to get protocol info (`Hello` message) from the `{}` plugin",
650 self.state.source.identity.name()
651 ),
652 })
653 })
654 }
655
656 pub fn hello(&self) -> Result<(), ShellError> {
658 self.write(PluginInput::Hello(ProtocolInfo::default()))?;
659 self.flush()
660 }
661
662 pub fn goodbye(&self) -> Result<(), ShellError> {
668 self.write(PluginInput::Goodbye)?;
669 self.flush()
670 }
671
672 pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
674 self.write(PluginInput::Signal(action))?;
675 self.flush()
676 }
677
678 pub fn write_engine_call_response(
681 &self,
682 id: EngineCallId,
683 response: EngineCallResponse<PipelineData>,
684 state: &CurrentCallState,
685 ) -> Result<(), ShellError> {
686 let mut writer = None;
688 let response = response.map_data(|data| {
689 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
690 writer = Some(data_writer);
691 Ok(data_header)
692 })?;
693
694 self.write(PluginInput::EngineCallResponse(id, response))?;
696 self.flush()?;
697
698 if let Some(writer) = writer {
700 writer.write_background()?;
701 }
702
703 Ok(())
704 }
705
706 fn write_plugin_call(
708 &self,
709 mut call: PluginCall<PipelineData>,
710 context: Option<&dyn PluginExecutionContext>,
711 ) -> Result<WritePluginCallResult, ShellError> {
712 let id = self.state.plugin_call_id_sequence.next()?;
713 let signals = context
714 .map(|c| c.signals().clone())
715 .unwrap_or_else(Signals::empty);
716 let (tx, rx) = mpsc::channel();
717 let (context_tx, context_rx) = mpsc::channel();
718 let keep_plugin_custom_values = mpsc::channel();
719
720 let state = CurrentCallState {
722 context_tx: Some(context_tx),
723 keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
724 entered_foreground: false,
725 span: call.span(),
726 };
727
728 state.prepare_plugin_call(&mut call, &self.state.source)?;
730
731 let (call, writer) = match call {
733 PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
734 PluginCall::Signature => (PluginCall::Signature, Default::default()),
735 PluginCall::CustomValueOp(value, op) => {
736 (PluginCall::CustomValueOp(value, op), Default::default())
737 }
738 PluginCall::Run(CallInfo { name, call, input }) => {
739 let (header, writer) = self.init_write_pipeline_data(input, &state)?;
740 (
741 PluginCall::Run(CallInfo {
742 name,
743 call,
744 input: header,
745 }),
746 writer,
747 )
748 }
749 };
750
751 let dont_send_response =
753 matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
754
755 self.state
757 .plugin_call_subscription_sender
758 .send((
759 id,
760 PluginCallState {
761 sender: Some(tx).filter(|_| !dont_send_response),
762 dont_send_response,
763 signals,
764 context_rx: Some(context_rx),
765 span: call.span(),
766 keep_plugin_custom_values,
767 remaining_streams_to_read: 0,
768 },
769 ))
770 .map_err(|_| {
771 let existing_error = self.state.error.get().cloned();
772 ShellError::GenericError {
773 error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
774 msg: "can't complete this operation because the plugin is closed".into(),
775 span: call.span(),
776 help: Some(format!(
777 "the plugin may have experienced an error. Try loading the plugin again \
778 with `{}`",
779 self.state.source.identity.use_command(),
780 )),
781 inner: existing_error.into_iter().collect(),
782 }
783 })?;
784
785 if let Some(ref gc) = self.gc {
789 gc.increment_locks(1);
790 }
791
792 self.write(PluginInput::Call(id, call))?;
794 self.flush()?;
795
796 Ok(WritePluginCallResult {
797 receiver: rx,
798 writer,
799 state,
800 })
801 }
802
803 fn receive_plugin_call_response(
805 &self,
806 rx: mpsc::Receiver<ReceivedPluginCallMessage>,
807 mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
808 mut state: CurrentCallState,
809 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
810 for msg in rx {
812 match msg {
813 ReceivedPluginCallMessage::Response(resp) => {
814 if state.entered_foreground {
815 if let Some(context) = context.as_deref_mut() {
817 if let Err(err) =
818 set_foreground(self.state.process.as_ref(), context, false)
819 {
820 log::warn!("Failed to leave foreground state on exit: {err:?}");
821 }
822 }
823 }
824 if resp.has_stream() {
825 if let Some(context) = context {
827 if let Some(ref context_tx) = state.context_tx {
828 let _ = context_tx.send(Context(context.boxed()));
829 }
830 }
831 }
832 return Ok(resp);
833 }
834 ReceivedPluginCallMessage::Error(err) => {
835 return Err(err);
836 }
837 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
838 self.handle_engine_call(
839 engine_call_id,
840 engine_call,
841 &mut state,
842 context.as_deref_mut(),
843 )?;
844 }
845 }
846 }
847 let existing_error = self.state.error.get().cloned();
850 Err(ShellError::GenericError {
851 error: format!(
852 "Failed to receive response to plugin call from `{}`",
853 self.state.source.identity.name()
854 ),
855 msg: "while waiting for this operation to complete".into(),
856 span: state.span,
857 help: Some(format!(
858 "try restarting the plugin with `{}`",
859 self.state.source.identity.use_command()
860 )),
861 inner: existing_error.into_iter().collect(),
862 })
863 }
864
865 fn handle_engine_call(
867 &self,
868 engine_call_id: EngineCallId,
869 engine_call: EngineCall<PipelineData>,
870 state: &mut CurrentCallState,
871 context: Option<&mut (dyn PluginExecutionContext + '_)>,
872 ) -> Result<(), ShellError> {
873 let process = self.state.process.as_ref();
874 let resp = handle_engine_call(engine_call, state, context, process)
875 .unwrap_or_else(EngineCallResponse::Error);
876 let mut writer = None;
878 let resp = resp
879 .map_data(|data| {
880 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
881 writer = Some(data_writer);
882 Ok(data_header)
883 })
884 .unwrap_or_else(|err| {
885 writer = None;
887 EngineCallResponse::Error(err)
888 });
889 self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
891 self.flush()?;
892 if let Some(writer) = writer {
893 writer.write_background()?;
894 }
895 Ok(())
896 }
897
898 fn plugin_call(
901 &self,
902 call: PluginCall<PipelineData>,
903 context: Option<&mut dyn PluginExecutionContext>,
904 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
905 if let Some(error) = self.state.error.get() {
907 return Err(ShellError::GenericError {
908 error: format!(
909 "Failed to send plugin call to `{}`",
910 self.state.source.identity.name()
911 ),
912 msg: "the plugin encountered an error before this operation could be attempted"
913 .into(),
914 span: call.span(),
915 help: Some(format!(
916 "try loading the plugin again with `{}`",
917 self.state.source.identity.use_command(),
918 )),
919 inner: vec![error.clone()],
920 });
921 }
922
923 let result = self.write_plugin_call(call, context.as_deref())?;
924
925 result.writer.write_background()?;
927
928 self.receive_plugin_call_response(result.receiver, context, result.state)
929 }
930
931 pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
933 match self.plugin_call(PluginCall::Metadata, None)? {
934 PluginCallResponse::Metadata(meta) => Ok(meta),
935 PluginCallResponse::Error(err) => Err(err.into()),
936 _ => Err(ShellError::PluginFailedToDecode {
937 msg: "Received unexpected response to plugin Metadata call".into(),
938 }),
939 }
940 }
941
942 pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
944 match self.plugin_call(PluginCall::Signature, None)? {
945 PluginCallResponse::Signature(sigs) => Ok(sigs),
946 PluginCallResponse::Error(err) => Err(err.into()),
947 _ => Err(ShellError::PluginFailedToDecode {
948 msg: "Received unexpected response to plugin Signature call".into(),
949 }),
950 }
951 }
952
953 pub fn run(
955 &self,
956 call: CallInfo<PipelineData>,
957 context: &mut dyn PluginExecutionContext,
958 ) -> Result<PipelineData, ShellError> {
959 match self.plugin_call(PluginCall::Run(call), Some(context))? {
960 PluginCallResponse::PipelineData(data) => Ok(data),
961 PluginCallResponse::Error(err) => Err(err.into()),
962 _ => Err(ShellError::PluginFailedToDecode {
963 msg: "Received unexpected response to plugin Run call".into(),
964 }),
965 }
966 }
967
968 fn custom_value_op_expecting_value(
970 &self,
971 value: Spanned<PluginCustomValueWithSource>,
972 op: CustomValueOp,
973 ) -> Result<Value, ShellError> {
974 let op_name = op.name();
975 let span = value.span;
976
977 value.item.verify_source(span, &self.state.source)?;
979
980 let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
981 match self.plugin_call(call, None)? {
982 PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
983 PluginCallResponse::Error(err) => Err(err.into()),
984 _ => Err(ShellError::PluginFailedToDecode {
985 msg: format!("Received unexpected response to custom value {op_name}() call"),
986 }),
987 }
988 }
989
990 pub fn custom_value_to_base_value(
992 &self,
993 value: Spanned<PluginCustomValueWithSource>,
994 ) -> Result<Value, ShellError> {
995 self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
996 }
997
998 pub fn custom_value_follow_path_int(
1000 &self,
1001 value: Spanned<PluginCustomValueWithSource>,
1002 index: Spanned<usize>,
1003 ) -> Result<Value, ShellError> {
1004 self.custom_value_op_expecting_value(value, CustomValueOp::FollowPathInt(index))
1005 }
1006
1007 pub fn custom_value_follow_path_string(
1009 &self,
1010 value: Spanned<PluginCustomValueWithSource>,
1011 column_name: Spanned<String>,
1012 ) -> Result<Value, ShellError> {
1013 self.custom_value_op_expecting_value(value, CustomValueOp::FollowPathString(column_name))
1014 }
1015
1016 pub fn custom_value_partial_cmp(
1018 &self,
1019 value: PluginCustomValueWithSource,
1020 other_value: Value,
1021 ) -> Result<Option<Ordering>, ShellError> {
1022 value.verify_source(Span::unknown(), &self.state.source)?;
1024
1025 let call = PluginCall::CustomValueOp(
1028 value.without_source().into_spanned(Span::unknown()),
1029 CustomValueOp::PartialCmp(other_value),
1030 );
1031 match self.plugin_call(call, None)? {
1032 PluginCallResponse::Ordering(ordering) => Ok(ordering),
1033 PluginCallResponse::Error(err) => Err(err.into()),
1034 _ => Err(ShellError::PluginFailedToDecode {
1035 msg: "Received unexpected response to custom value partial_cmp() call".into(),
1036 }),
1037 }
1038 }
1039
1040 pub fn custom_value_operation(
1042 &self,
1043 left: Spanned<PluginCustomValueWithSource>,
1044 operator: Spanned<Operator>,
1045 right: Value,
1046 ) -> Result<Value, ShellError> {
1047 self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1048 }
1049
1050 pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1052 drop(self.write_plugin_call(
1057 PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1058 None,
1059 )?);
1060 Ok(())
1061 }
1062}
1063
1064impl Interface for PluginInterface {
1065 type Output = PluginInput;
1066 type DataContext = CurrentCallState;
1067
1068 fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1069 log::trace!("to plugin: {input:?}");
1070 self.state.writer.write(&input).map_err(|err| {
1071 log::warn!("write() error: {err}");
1072 self.state.error.get().cloned().unwrap_or(err)
1075 })
1076 }
1077
1078 fn flush(&self) -> Result<(), ShellError> {
1079 self.state.writer.flush().map_err(|err| {
1080 log::warn!("flush() error: {err}");
1081 self.state.error.get().cloned().unwrap_or(err)
1084 })
1085 }
1086
1087 fn stream_id_sequence(&self) -> &Sequence {
1088 &self.state.stream_id_sequence
1089 }
1090
1091 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1092 &self.stream_manager_handle
1093 }
1094
1095 fn prepare_pipeline_data(
1096 &self,
1097 data: PipelineData,
1098 state: &CurrentCallState,
1099 ) -> Result<PipelineData, ShellError> {
1100 match data {
1102 PipelineData::Value(mut value, meta) => {
1103 state.prepare_value(&mut value, &self.state.source)?;
1104 Ok(PipelineData::value(value, meta))
1105 }
1106 PipelineData::ListStream(stream, meta) => {
1107 let source = self.state.source.clone();
1108 let state = state.clone();
1109 Ok(PipelineData::list_stream(
1110 stream.map(move |mut value| {
1111 match state.prepare_value(&mut value, &source) {
1112 Ok(()) => value,
1113 Err(err) => Value::error(err, value.span()),
1115 }
1116 }),
1117 meta,
1118 ))
1119 }
1120 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1121 }
1122 }
1123}
1124
1125impl Drop for PluginInterface {
1126 fn drop(&mut self) {
1127 if Arc::strong_count(&self.state) < 3 {
1133 if let Err(err) = self.goodbye() {
1134 log::warn!("Error during plugin Goodbye: {err}");
1135 }
1136 }
1137 }
1138}
1139
1140#[must_use]
1142struct WritePluginCallResult {
1143 receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1145 writer: PipelineDataWriter<PluginInterface>,
1147 state: CurrentCallState,
1149}
1150
1151#[derive(Default, Clone)]
1153pub struct CurrentCallState {
1154 context_tx: Option<mpsc::Sender<Context>>,
1157 keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1160 entered_foreground: bool,
1163 span: Option<Span>,
1165}
1166
1167impl CurrentCallState {
1168 fn prepare_custom_value(
1171 &self,
1172 custom_value: Spanned<&mut Box<dyn CustomValue>>,
1173 source: &PluginSource,
1174 ) -> Result<(), ShellError> {
1175 PluginCustomValueWithSource::verify_source_of_custom_value(
1177 custom_value.as_deref().map(|cv| &**cv),
1178 source,
1179 )?;
1180
1181 if let Some(keep_tx) = &self.keep_plugin_custom_values_tx {
1183 if let Some(custom_value) = custom_value
1184 .item
1185 .as_any()
1186 .downcast_ref::<PluginCustomValueWithSource>()
1187 {
1188 if custom_value.notify_on_drop() {
1189 log::trace!("Keeping custom value for drop later: {custom_value:?}");
1190 keep_tx
1191 .send(custom_value.clone())
1192 .map_err(|_| ShellError::NushellFailed {
1193 msg: "Failed to custom value to keep channel".into(),
1194 })?;
1195 }
1196 }
1197 }
1198
1199 PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1201
1202 Ok(())
1203 }
1204
1205 fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1207 with_custom_values_in(value, |custom_value| {
1208 self.prepare_custom_value(custom_value, source)
1209 })
1210 }
1211
1212 fn prepare_call_args(
1214 &self,
1215 call: &mut EvaluatedCall,
1216 source: &PluginSource,
1217 ) -> Result<(), ShellError> {
1218 for arg in call.positional.iter_mut() {
1219 self.prepare_value(arg, source)?;
1220 }
1221 for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1222 self.prepare_value(arg, source)?;
1223 }
1224 Ok(())
1225 }
1226
1227 fn prepare_plugin_call<D>(
1230 &self,
1231 call: &mut PluginCall<D>,
1232 source: &PluginSource,
1233 ) -> Result<(), ShellError> {
1234 match call {
1235 PluginCall::Metadata => Ok(()),
1236 PluginCall::Signature => Ok(()),
1237 PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1238 PluginCall::CustomValueOp(_, op) => {
1239 match op {
1241 CustomValueOp::ToBaseValue => Ok(()),
1242 CustomValueOp::FollowPathInt(_) => Ok(()),
1243 CustomValueOp::FollowPathString(_) => Ok(()),
1244 CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1245 CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1246 CustomValueOp::Dropped => Ok(()),
1247 }
1248 }
1249 }
1250 }
1251}
1252
1253pub(crate) fn handle_engine_call(
1255 call: EngineCall<PipelineData>,
1256 state: &mut CurrentCallState,
1257 context: Option<&mut (dyn PluginExecutionContext + '_)>,
1258 process: Option<&PluginProcess>,
1259) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1260 let call_name = call.name();
1261
1262 let context = context.ok_or_else(|| ShellError::GenericError {
1263 error: "A plugin execution context is required for this engine call".into(),
1264 msg: format!("attempted to call {call_name} outside of a command invocation"),
1265 span: None,
1266 help: Some("this is probably a bug with the plugin".into()),
1267 inner: vec![],
1268 })?;
1269
1270 match call {
1271 EngineCall::GetConfig => {
1272 let config = SharedCow::from(context.get_config()?);
1273 Ok(EngineCallResponse::Config(config))
1274 }
1275 EngineCall::GetPluginConfig => {
1276 let plugin_config = context.get_plugin_config()?;
1277 Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1278 }
1279 EngineCall::GetEnvVar(name) => {
1280 let value = context.get_env_var(&name)?;
1281 Ok(value
1282 .cloned()
1283 .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1284 }
1285 EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1286 EngineCall::GetCurrentDir => {
1287 let current_dir = context.get_current_dir()?;
1288 Ok(EngineCallResponse::value(Value::string(
1289 current_dir.item,
1290 current_dir.span,
1291 )))
1292 }
1293 EngineCall::AddEnvVar(name, value) => {
1294 context.add_env_var(name, value)?;
1295 Ok(EngineCallResponse::empty())
1296 }
1297 EngineCall::GetHelp => {
1298 let help = context.get_help()?;
1299 Ok(EngineCallResponse::value(Value::string(
1300 help.item, help.span,
1301 )))
1302 }
1303 EngineCall::EnterForeground => {
1304 let resp = set_foreground(process, context, true)?;
1305 state.entered_foreground = true;
1306 Ok(resp)
1307 }
1308 EngineCall::LeaveForeground => {
1309 let resp = set_foreground(process, context, false)?;
1310 state.entered_foreground = false;
1311 Ok(resp)
1312 }
1313 EngineCall::GetSpanContents(span) => {
1314 let contents = context.get_span_contents(span)?;
1315 Ok(EngineCallResponse::value(Value::binary(
1316 contents.item,
1317 contents.span,
1318 )))
1319 }
1320 EngineCall::EvalClosure {
1321 closure,
1322 positional,
1323 input,
1324 redirect_stdout,
1325 redirect_stderr,
1326 } => context
1327 .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1328 .map(EngineCallResponse::PipelineData),
1329 EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1330 if let Some(decl_id) = decl_id {
1331 EngineCallResponse::Identifier(decl_id)
1332 } else {
1333 EngineCallResponse::empty()
1334 }
1335 }),
1336 EngineCall::CallDecl {
1337 decl_id,
1338 call,
1339 input,
1340 redirect_stdout,
1341 redirect_stderr,
1342 } => context
1343 .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1344 .map(EngineCallResponse::PipelineData),
1345 }
1346}
1347
1348fn set_foreground(
1350 process: Option<&PluginProcess>,
1351 context: &mut dyn PluginExecutionContext,
1352 enter: bool,
1353) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1354 if let Some(process) = process {
1355 if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1356 if enter {
1357 let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1358 Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1359 EngineCallResponse::value(Value::int(id as i64, context.span()))
1360 }))
1361 } else {
1362 process.exit_foreground()?;
1363 Ok(EngineCallResponse::empty())
1364 }
1365 } else {
1366 Err(ShellError::NushellFailed {
1368 msg: "missing required pipeline_externals_state from context \
1369 for entering foreground"
1370 .into(),
1371 })
1372 }
1373 } else {
1374 Err(ShellError::GenericError {
1375 error: "Can't manage plugin process to enter foreground".into(),
1376 msg: "the process ID for this plugin is unknown".into(),
1377 span: Some(context.span()),
1378 help: Some("the plugin may be running in a test".into()),
1379 inner: vec![],
1380 })
1381 }
1382}