1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut, with_custom_values_in},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10 GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11 PluginInput, PluginOption, PluginOutput, ProtocolInfo, StreamId, StreamMessage,
12};
13use nu_protocol::{
14 CustomValue, DynamicSuggestion, IntoSpanned, PipelineData, PluginMetadata, PluginSignature,
15 ShellError, SignalAction, Signals, Span, Spanned, Value, ast::Operator, casing::Casing,
16 engine::Sequence,
17};
18use nu_utils::SharedCow;
19use std::{
20 collections::{BTreeMap, btree_map},
21 path::Path,
22 sync::{Arc, OnceLock, mpsc},
23};
24
25use crate::{
26 PluginCustomValueWithSource, PluginExecutionContext, PluginGc, PluginSource,
27 process::PluginProcess,
28};
29
30#[cfg(test)]
31mod tests;
32
33#[derive(Debug)]
34enum ReceivedPluginCallMessage {
35 Response(PluginCallResponse<PipelineData>),
37
38 Error(ShellError),
40
41 EngineCall(EngineCallId, EngineCall<PipelineData>),
46}
47
48pub(crate) struct Context(Box<dyn PluginExecutionContext>);
50
51impl std::fmt::Debug for Context {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.write_str("Context")
54 }
55}
56
57impl std::ops::Deref for Context {
58 type Target = dyn PluginExecutionContext;
59
60 fn deref(&self) -> &Self::Target {
61 &*self.0
62 }
63}
64
65struct PluginInterfaceState {
67 source: Arc<PluginSource>,
69 process: Option<PluginProcess>,
71 protocol_info: Waitable<Arc<ProtocolInfo>>,
73 plugin_call_id_sequence: Sequence,
75 stream_id_sequence: Sequence,
77 plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallState)>,
79 error: OnceLock<ShellError>,
81 writer: Box<dyn PluginWrite<PluginInput>>,
83}
84
85impl std::fmt::Debug for PluginInterfaceState {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct("PluginInterfaceState")
88 .field("source", &self.source)
89 .field("protocol_info", &self.protocol_info)
90 .field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
91 .field("stream_id_sequence", &self.stream_id_sequence)
92 .field(
93 "plugin_call_subscription_sender",
94 &self.plugin_call_subscription_sender,
95 )
96 .field("error", &self.error)
97 .finish_non_exhaustive()
98 }
99}
100
101#[derive(Debug)]
103struct PluginCallState {
104 sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
106 dont_send_response: bool,
109 signals: Signals,
111 context_rx: Option<mpsc::Receiver<Context>>,
113 span: Option<Span>,
115 keep_plugin_custom_values: (
120 mpsc::Sender<PluginCustomValueWithSource>,
121 mpsc::Receiver<PluginCustomValueWithSource>,
122 ),
123 remaining_streams_to_read: i32,
125}
126
127impl Drop for PluginCallState {
128 fn drop(&mut self) {
129 for value in self.keep_plugin_custom_values.1.try_iter() {
131 log::trace!("Dropping custom value that was kept: {value:?}");
132 drop(value);
133 }
134 }
135}
136
137#[derive(Debug)]
139pub struct PluginInterfaceManager {
140 state: Arc<PluginInterfaceState>,
142 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
144 stream_manager: StreamManager,
146 plugin_call_states: BTreeMap<PluginCallId, PluginCallState>,
148 plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
150 plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
154 gc: Option<PluginGc>,
156}
157
158impl PluginInterfaceManager {
159 pub fn new(
160 source: Arc<PluginSource>,
161 pid: Option<u32>,
162 writer: impl PluginWrite<PluginInput> + 'static,
163 ) -> PluginInterfaceManager {
164 let (subscription_tx, subscription_rx) = mpsc::channel();
165 let protocol_info_mut = WaitableMut::new();
166
167 PluginInterfaceManager {
168 state: Arc::new(PluginInterfaceState {
169 source,
170 process: pid.map(PluginProcess::new),
171 protocol_info: protocol_info_mut.reader(),
172 plugin_call_id_sequence: Sequence::default(),
173 stream_id_sequence: Sequence::default(),
174 plugin_call_subscription_sender: subscription_tx,
175 error: OnceLock::new(),
176 writer: Box::new(writer),
177 }),
178 protocol_info_mut,
179 stream_manager: StreamManager::new(),
180 plugin_call_states: BTreeMap::new(),
181 plugin_call_subscription_receiver: subscription_rx,
182 plugin_call_input_streams: BTreeMap::new(),
183 gc: None,
184 }
185 }
186
187 pub fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
191 self.gc = gc;
192 }
193
194 fn receive_plugin_call_subscriptions(&mut self) {
196 while let Ok((id, state)) = self.plugin_call_subscription_receiver.try_recv() {
197 if let btree_map::Entry::Vacant(e) = self.plugin_call_states.entry(id) {
198 e.insert(state);
199 } else {
200 log::warn!("Duplicate plugin call ID ignored: {id}");
201 }
202 }
203 }
204
205 fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
207 self.plugin_call_input_streams.insert(stream_id, call_id);
208 self.receive_plugin_call_subscriptions();
210 if let Some(state) = self.plugin_call_states.get_mut(&call_id) {
211 state.remaining_streams_to_read += 1;
212 }
213 if let Some(ref gc) = self.gc {
215 gc.increment_locks(1);
216 }
217 }
218
219 fn recv_stream_ended(&mut self, stream_id: StreamId) {
221 if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
222 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(call_id) {
223 e.get_mut().remaining_streams_to_read -= 1;
224 if e.get().remaining_streams_to_read <= 0 {
226 e.remove();
227 }
228 }
229 if let Some(ref gc) = self.gc {
232 gc.decrement_locks(1);
233 }
234 }
235 }
236
237 fn get_signals(&mut self, id: PluginCallId) -> Result<Signals, ShellError> {
239 self.receive_plugin_call_subscriptions();
241 self.plugin_call_states
243 .get(&id)
244 .map(|state| state.signals.clone())
245 .ok_or_else(|| ShellError::PluginFailedToDecode {
246 msg: format!("Unknown plugin call ID: {id}"),
247 })
248 }
249
250 fn send_plugin_call_response(
252 &mut self,
253 id: PluginCallId,
254 response: PluginCallResponse<PipelineData>,
255 ) -> Result<(), ShellError> {
256 self.receive_plugin_call_subscriptions();
258
259 if let btree_map::Entry::Occupied(mut e) = self.plugin_call_states.entry(id) {
260 if !e.get().dont_send_response
264 && e.get_mut()
265 .sender
266 .take()
267 .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
268 .is_none()
269 {
270 log::warn!("Received a plugin call response for id={id}, but the caller hung up");
271 }
272 if e.get().remaining_streams_to_read <= 0 {
274 e.remove();
275 }
276 Ok(())
277 } else {
278 Err(ShellError::PluginFailedToDecode {
279 msg: format!("Unknown plugin call ID: {id}"),
280 })
281 }
282 }
283
284 fn spawn_engine_call_handler(
287 &mut self,
288 id: PluginCallId,
289 ) -> Result<&mpsc::Sender<ReceivedPluginCallMessage>, ShellError> {
290 let interface = self.get_interface();
291
292 if let Some(state) = self.plugin_call_states.get_mut(&id) {
293 if state.sender.is_none() {
294 let (tx, rx) = mpsc::channel();
295 let context_rx =
296 state
297 .context_rx
298 .take()
299 .ok_or_else(|| ShellError::NushellFailed {
300 msg: "Tried to spawn the fallback engine call handler more than once"
301 .into(),
302 })?;
303
304 let mut current_call_state = CurrentCallState {
306 context_tx: None,
307 keep_plugin_custom_values_tx: Some(state.keep_plugin_custom_values.0.clone()),
308 entered_foreground: false,
309 span: state.span,
310 };
311
312 let handler = move || {
313 let mut context = context_rx
315 .recv()
316 .ok() .map(|c| c.0);
318
319 for msg in rx {
320 match msg {
322 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
323 if let Err(err) = interface.handle_engine_call(
324 engine_call_id,
325 engine_call,
326 &mut current_call_state,
327 context.as_deref_mut(),
328 ) {
329 log::warn!(
330 "Error in plugin post-response engine call handler: \
331 {err:?}"
332 );
333 return;
334 }
335 }
336 other => log::warn!(
337 "Bad message received in plugin post-response \
338 engine call handler: {other:?}"
339 ),
340 }
341 }
342 };
343 std::thread::Builder::new()
344 .name("plugin engine call handler".into())
345 .spawn(handler)
346 .expect("failed to spawn thread");
347 state.sender = Some(tx);
348 Ok(state.sender.as_ref().unwrap_or_else(|| unreachable!()))
349 } else {
350 Err(ShellError::NushellFailed {
351 msg: "Tried to spawn the fallback engine call handler before the plugin call \
352 response had been received"
353 .into(),
354 })
355 }
356 } else {
357 Err(ShellError::NushellFailed {
358 msg: format!("Couldn't find plugin ID={id} in subscriptions"),
359 })
360 }
361 }
362
363 fn send_engine_call(
365 &mut self,
366 plugin_call_id: PluginCallId,
367 engine_call_id: EngineCallId,
368 call: EngineCall<PipelineData>,
369 ) -> Result<(), ShellError> {
370 self.receive_plugin_call_subscriptions();
372
373 if let Some(subscription) = self.plugin_call_states.get(&plugin_call_id) {
375 let msg = ReceivedPluginCallMessage::EngineCall(engine_call_id, call);
376 let send_error = |this: &Self| {
378 log::warn!(
379 "Received an engine call for plugin_call_id={plugin_call_id}, \
380 but the caller hung up"
381 );
382 this.state.writer.write(&PluginInput::EngineCallResponse(
385 engine_call_id,
386 EngineCallResponse::Error(ShellError::GenericError {
387 error: "Caller hung up".to_string(),
388 msg: "Can't make engine call because the original caller hung up"
389 .to_string(),
390 span: None,
391 help: None,
392 inner: vec![],
393 }),
394 ))?;
395 this.state.writer.flush()
396 };
397 if let Some(sender) = subscription.sender.as_ref() {
399 sender.send(msg).or_else(|_| send_error(self))
400 } else {
401 let sender = self.spawn_engine_call_handler(plugin_call_id)?;
403 sender.send(msg).or_else(|_| send_error(self))
404 }
405 } else {
406 Err(ShellError::PluginFailedToDecode {
407 msg: format!("Unknown plugin call ID: {plugin_call_id}"),
408 })
409 }
410 }
411
412 pub fn is_finished(&self) -> bool {
415 Arc::strong_count(&self.state) < 2
416 }
417
418 pub fn consume_all(
422 &mut self,
423 mut reader: impl PluginRead<PluginOutput>,
424 ) -> Result<(), ShellError> {
425 let mut result = Ok(());
426
427 while let Some(msg) = reader.read().transpose() {
428 if self.is_finished() {
429 break;
430 }
431
432 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
434 let _ = self.state.error.set(err.clone());
436 let _ = self.stream_manager.broadcast_read_error(err.clone());
438 self.receive_plugin_call_subscriptions();
440 for subscription in std::mem::take(&mut self.plugin_call_states).into_values() {
441 let _ = subscription
442 .sender
443 .as_ref()
444 .map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
445 }
446 result = Err(err);
447 break;
448 }
449 }
450
451 if let Some(ref gc) = self.gc {
453 gc.exited();
454 }
455 result
456 }
457}
458
459impl InterfaceManager for PluginInterfaceManager {
460 type Interface = PluginInterface;
461 type Input = PluginOutput;
462
463 fn get_interface(&self) -> Self::Interface {
464 PluginInterface {
465 state: self.state.clone(),
466 stream_manager_handle: self.stream_manager.get_handle(),
467 gc: self.gc.clone(),
468 }
469 }
470
471 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
472 log::trace!("from plugin: {input:?}");
473
474 match input {
475 PluginOutput::Hello(info) => {
476 let info = Arc::new(info);
477 self.protocol_info_mut.set(info.clone())?;
478
479 let local_info = ProtocolInfo::default();
480 if local_info.is_compatible_with(&info)? {
481 Ok(())
482 } else {
483 Err(ShellError::PluginFailedToLoad {
484 msg: format!(
485 "Plugin `{}` is compiled for nushell version {}, \
486 which is not compatible with version {}",
487 self.state.source.name(),
488 info.version,
489 local_info.version,
490 ),
491 })
492 }
493 }
494 _ if !self.state.protocol_info.is_set() => {
495 Err(ShellError::PluginFailedToLoad {
497 msg: format!(
498 "Failed to receive initial Hello message from `{}`. \
499 This plugin might be too old",
500 self.state.source.name()
501 ),
502 })
503 }
504 PluginOutput::Data(..)
506 | PluginOutput::End(..)
507 | PluginOutput::Drop(..)
508 | PluginOutput::Ack(..) => {
509 self.consume_stream_message(input.try_into().map_err(|msg| {
510 ShellError::NushellFailed {
511 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
512 }
513 })?)
514 }
515 PluginOutput::Option(option) => match option {
516 PluginOption::GcDisabled(disabled) => {
517 if let Some(ref gc) = self.gc {
519 gc.set_disabled(disabled);
520 }
521 Ok(())
522 }
523 },
524 PluginOutput::CallResponse(id, response) => {
525 let response = response
527 .map_data(|data| {
528 let signals = self.get_signals(id)?;
529
530 if let Some(stream_id) = data.stream_id() {
532 self.recv_stream_started(id, stream_id);
533 }
534
535 self.read_pipeline_data(data, &signals)
536 })
537 .unwrap_or_else(|err| {
538 PluginCallResponse::Error(err.into())
541 });
542 let result = self.send_plugin_call_response(id, response);
543 if result.is_ok() {
544 if let Some(ref gc) = self.gc {
546 gc.decrement_locks(1);
547 }
548 }
549 result
550 }
551 PluginOutput::EngineCall { context, id, call } => {
552 let call = call
553 .map_data(|input| {
555 let signals = self.get_signals(context)?;
556 self.read_pipeline_data(input, &signals)
557 })
558 .and_then(|mut engine_call| {
560 match engine_call {
561 EngineCall::EvalClosure {
562 ref mut positional, ..
563 } => {
564 for arg in positional.iter_mut() {
565 PluginCustomValueWithSource::add_source_in(
567 arg,
568 &self.state.source,
569 )?;
570 }
571 Ok(engine_call)
572 }
573 _ => Ok(engine_call),
574 }
575 });
576 match call {
577 Ok(call) => self.send_engine_call(context, id, call),
578 Err(err) => self.get_interface().write_engine_call_response(
580 id,
581 EngineCallResponse::Error(err),
582 &CurrentCallState::default(),
583 ),
584 }
585 }
586 }
587 }
588
589 fn stream_manager(&self) -> &StreamManager {
590 &self.stream_manager
591 }
592
593 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
594 match data {
596 PipelineData::Value(ref mut value, _) => {
597 with_custom_values_in(value, |custom_value| {
598 PluginCustomValueWithSource::add_source(custom_value.item, &self.state.source);
599 Ok::<_, ShellError>(())
600 })?;
601 Ok(data)
602 }
603 PipelineData::ListStream(stream, meta) => {
604 let source = self.state.source.clone();
605 Ok(PipelineData::list_stream(
606 stream.map(move |mut value| {
607 let _ = PluginCustomValueWithSource::add_source_in(&mut value, &source);
608 value
609 }),
610 meta,
611 ))
612 }
613 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
614 }
615 }
616
617 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
618 if let StreamMessage::End(id) = message {
620 self.recv_stream_ended(id);
621 }
622 self.stream_manager.handle_message(message)
623 }
624}
625
626#[derive(Debug, Clone)]
630#[doc(hidden)]
631pub struct PluginInterface {
632 state: Arc<PluginInterfaceState>,
634 stream_manager_handle: StreamManagerHandle,
636 gc: Option<PluginGc>,
638}
639
640impl PluginInterface {
641 pub fn pid(&self) -> Option<u32> {
643 self.state.process.as_ref().map(|p| p.pid())
644 }
645
646 pub fn protocol_info(&self) -> Result<Arc<ProtocolInfo>, ShellError> {
648 self.state.protocol_info.get().and_then(|info| {
649 info.ok_or_else(|| ShellError::PluginFailedToLoad {
650 msg: format!(
651 "Failed to get protocol info (`Hello` message) from the `{}` plugin",
652 self.state.source.identity.name()
653 ),
654 })
655 })
656 }
657
658 pub fn hello(&self) -> Result<(), ShellError> {
660 self.write(PluginInput::Hello(ProtocolInfo::default()))?;
661 self.flush()
662 }
663
664 pub fn goodbye(&self) -> Result<(), ShellError> {
670 self.write(PluginInput::Goodbye)?;
671 self.flush()
672 }
673
674 pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
676 self.write(PluginInput::Signal(action))?;
677 self.flush()
678 }
679
680 pub fn write_engine_call_response(
683 &self,
684 id: EngineCallId,
685 response: EngineCallResponse<PipelineData>,
686 state: &CurrentCallState,
687 ) -> Result<(), ShellError> {
688 let mut writer = None;
690 let response = response.map_data(|data| {
691 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
692 writer = Some(data_writer);
693 Ok(data_header)
694 })?;
695
696 self.write(PluginInput::EngineCallResponse(id, response))?;
698 self.flush()?;
699
700 if let Some(writer) = writer {
702 writer.write_background()?;
703 }
704
705 Ok(())
706 }
707
708 fn write_plugin_call(
710 &self,
711 mut call: PluginCall<PipelineData>,
712 context: Option<&dyn PluginExecutionContext>,
713 ) -> Result<WritePluginCallResult, ShellError> {
714 let id = self.state.plugin_call_id_sequence.next()?;
715 let signals = context
716 .map(|c| c.signals().clone())
717 .unwrap_or_else(Signals::empty);
718 let (tx, rx) = mpsc::channel();
719 let (context_tx, context_rx) = mpsc::channel();
720 let keep_plugin_custom_values = mpsc::channel();
721
722 let state = CurrentCallState {
724 context_tx: Some(context_tx),
725 keep_plugin_custom_values_tx: Some(keep_plugin_custom_values.0.clone()),
726 entered_foreground: false,
727 span: call.span(),
728 };
729
730 state.prepare_plugin_call(&mut call, &self.state.source)?;
732
733 let (call, writer) = match call {
735 PluginCall::Metadata => (PluginCall::Metadata, Default::default()),
736 PluginCall::Signature => (PluginCall::Signature, Default::default()),
737 PluginCall::CustomValueOp(value, op) => {
738 (PluginCall::CustomValueOp(value, op), Default::default())
739 }
740 PluginCall::GetCompletion(flag_name) => {
741 (PluginCall::GetCompletion(flag_name), Default::default())
742 }
743 PluginCall::Run(CallInfo { name, call, input }) => {
744 let (header, writer) = self.init_write_pipeline_data(input, &state)?;
745 (
746 PluginCall::Run(CallInfo {
747 name,
748 call,
749 input: header,
750 }),
751 writer,
752 )
753 }
754 };
755
756 let dont_send_response =
758 matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
759
760 self.state
762 .plugin_call_subscription_sender
763 .send((
764 id,
765 PluginCallState {
766 sender: Some(tx).filter(|_| !dont_send_response),
767 dont_send_response,
768 signals,
769 context_rx: Some(context_rx),
770 span: call.span(),
771 keep_plugin_custom_values,
772 remaining_streams_to_read: 0,
773 },
774 ))
775 .map_err(|_| {
776 let existing_error = self.state.error.get().cloned();
777 ShellError::GenericError {
778 error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
779 msg: "can't complete this operation because the plugin is closed".into(),
780 span: call.span(),
781 help: Some(format!(
782 "the plugin may have experienced an error. Try loading the plugin again \
783 with `{}`",
784 self.state.source.identity.use_command(),
785 )),
786 inner: existing_error.into_iter().collect(),
787 }
788 })?;
789
790 if let Some(ref gc) = self.gc {
794 gc.increment_locks(1);
795 }
796
797 self.write(PluginInput::Call(id, call))?;
799 self.flush()?;
800
801 Ok(WritePluginCallResult {
802 receiver: rx,
803 writer,
804 state,
805 })
806 }
807
808 fn receive_plugin_call_response(
810 &self,
811 rx: mpsc::Receiver<ReceivedPluginCallMessage>,
812 mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
813 mut state: CurrentCallState,
814 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
815 for msg in rx {
817 match msg {
818 ReceivedPluginCallMessage::Response(resp) => {
819 if state.entered_foreground {
820 if let Some(context) = context.as_deref_mut()
822 && let Err(err) =
823 set_foreground(self.state.process.as_ref(), context, false)
824 {
825 log::warn!("Failed to leave foreground state on exit: {err:?}");
826 }
827 }
828 if resp.has_stream() {
829 if let Some(context) = context
831 && let Some(ref context_tx) = state.context_tx
832 {
833 let _ = context_tx.send(Context(context.boxed()));
834 }
835 }
836 return Ok(resp);
837 }
838 ReceivedPluginCallMessage::Error(err) => {
839 return Err(err);
840 }
841 ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
842 self.handle_engine_call(
843 engine_call_id,
844 engine_call,
845 &mut state,
846 context.as_deref_mut(),
847 )?;
848 }
849 }
850 }
851 let existing_error = self.state.error.get().cloned();
854 Err(ShellError::GenericError {
855 error: format!(
856 "Failed to receive response to plugin call from `{}`",
857 self.state.source.identity.name()
858 ),
859 msg: "while waiting for this operation to complete".into(),
860 span: state.span,
861 help: Some(format!(
862 "try restarting the plugin with `{}`",
863 self.state.source.identity.use_command()
864 )),
865 inner: existing_error.into_iter().collect(),
866 })
867 }
868
869 fn handle_engine_call(
871 &self,
872 engine_call_id: EngineCallId,
873 engine_call: EngineCall<PipelineData>,
874 state: &mut CurrentCallState,
875 context: Option<&mut (dyn PluginExecutionContext + '_)>,
876 ) -> Result<(), ShellError> {
877 let process = self.state.process.as_ref();
878 let resp = handle_engine_call(engine_call, state, context, process)
879 .unwrap_or_else(EngineCallResponse::Error);
880 let mut writer = None;
882 let resp = resp
883 .map_data(|data| {
884 let (data_header, data_writer) = self.init_write_pipeline_data(data, state)?;
885 writer = Some(data_writer);
886 Ok(data_header)
887 })
888 .unwrap_or_else(|err| {
889 writer = None;
891 EngineCallResponse::Error(err)
892 });
893 self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
895 self.flush()?;
896 if let Some(writer) = writer {
897 writer.write_background()?;
898 }
899 Ok(())
900 }
901
902 fn plugin_call(
905 &self,
906 call: PluginCall<PipelineData>,
907 context: Option<&mut dyn PluginExecutionContext>,
908 ) -> Result<PluginCallResponse<PipelineData>, ShellError> {
909 if let Some(error) = self.state.error.get() {
911 return Err(ShellError::GenericError {
912 error: format!(
913 "Failed to send plugin call to `{}`",
914 self.state.source.identity.name()
915 ),
916 msg: "the plugin encountered an error before this operation could be attempted"
917 .into(),
918 span: call.span(),
919 help: Some(format!(
920 "try loading the plugin again with `{}`",
921 self.state.source.identity.use_command(),
922 )),
923 inner: vec![error.clone()],
924 });
925 }
926
927 let result = self.write_plugin_call(call, context.as_deref())?;
928
929 result.writer.write_background()?;
931
932 self.receive_plugin_call_response(result.receiver, context, result.state)
933 }
934
935 pub fn get_metadata(&self) -> Result<PluginMetadata, ShellError> {
937 match self.plugin_call(PluginCall::Metadata, None)? {
938 PluginCallResponse::Metadata(meta) => Ok(meta),
939 PluginCallResponse::Error(err) => Err(err.into()),
940 _ => Err(ShellError::PluginFailedToDecode {
941 msg: "Received unexpected response to plugin Metadata call".into(),
942 }),
943 }
944 }
945
946 pub fn get_signature(&self) -> Result<Vec<PluginSignature>, ShellError> {
948 match self.plugin_call(PluginCall::Signature, None)? {
949 PluginCallResponse::Signature(sigs) => Ok(sigs),
950 PluginCallResponse::Error(err) => Err(err.into()),
951 _ => Err(ShellError::PluginFailedToDecode {
952 msg: "Received unexpected response to plugin Signature call".into(),
953 }),
954 }
955 }
956
957 pub fn run(
959 &self,
960 call: CallInfo<PipelineData>,
961 context: &mut dyn PluginExecutionContext,
962 ) -> Result<PipelineData, ShellError> {
963 match self.plugin_call(PluginCall::Run(call), Some(context))? {
964 PluginCallResponse::PipelineData(data) => Ok(data),
965 PluginCallResponse::Error(err) => Err(err.into()),
966 _ => Err(ShellError::PluginFailedToDecode {
967 msg: "Received unexpected response to plugin Run call".into(),
968 }),
969 }
970 }
971
972 pub fn get_dynamic_completion(
974 &self,
975 info: GetCompletionInfo,
976 ) -> Result<Option<Vec<DynamicSuggestion>>, ShellError> {
977 match self.plugin_call(PluginCall::GetCompletion(info), None)? {
978 PluginCallResponse::CompletionItems(items) => Ok(items),
979 PluginCallResponse::Error(err) => Err(err.into()),
980 _ => Err(ShellError::PluginFailedToDecode {
981 msg: "Received unexpected response to plugin GetCompletion call".into(),
982 }),
983 }
984 }
985
986 fn custom_value_op_expecting_value(
988 &self,
989 value: Spanned<PluginCustomValueWithSource>,
990 op: CustomValueOp,
991 ) -> Result<Value, ShellError> {
992 let op_name = op.name();
993 let span = value.span;
994
995 value.item.verify_source(span, &self.state.source)?;
997
998 let call = PluginCall::CustomValueOp(value.map(|cv| cv.without_source()), op);
999 match self.plugin_call(call, None)? {
1000 PluginCallResponse::PipelineData(out_data) => out_data.into_value(span),
1001 PluginCallResponse::Error(err) => Err(err.into()),
1002 _ => Err(ShellError::PluginFailedToDecode {
1003 msg: format!("Received unexpected response to custom value {op_name}() call"),
1004 }),
1005 }
1006 }
1007
1008 pub fn custom_value_to_base_value(
1010 &self,
1011 value: Spanned<PluginCustomValueWithSource>,
1012 ) -> Result<Value, ShellError> {
1013 self.custom_value_op_expecting_value(value, CustomValueOp::ToBaseValue)
1014 }
1015
1016 pub fn custom_value_follow_path_int(
1018 &self,
1019 value: Spanned<PluginCustomValueWithSource>,
1020 index: Spanned<usize>,
1021 optional: bool,
1022 ) -> Result<Value, ShellError> {
1023 self.custom_value_op_expecting_value(
1024 value,
1025 CustomValueOp::FollowPathInt { index, optional },
1026 )
1027 }
1028
1029 pub fn custom_value_follow_path_string(
1031 &self,
1032 value: Spanned<PluginCustomValueWithSource>,
1033 column_name: Spanned<String>,
1034 optional: bool,
1035 casing: Casing,
1036 ) -> Result<Value, ShellError> {
1037 self.custom_value_op_expecting_value(
1038 value,
1039 CustomValueOp::FollowPathString {
1040 column_name,
1041 optional,
1042 casing,
1043 },
1044 )
1045 }
1046
1047 pub fn custom_value_partial_cmp(
1049 &self,
1050 value: PluginCustomValueWithSource,
1051 other_value: Value,
1052 ) -> Result<Option<Ordering>, ShellError> {
1053 value.verify_source(Span::unknown(), &self.state.source)?;
1055
1056 let call = PluginCall::CustomValueOp(
1059 value.without_source().into_spanned(Span::unknown()),
1060 CustomValueOp::PartialCmp(other_value),
1061 );
1062 match self.plugin_call(call, None)? {
1063 PluginCallResponse::Ordering(ordering) => Ok(ordering),
1064 PluginCallResponse::Error(err) => Err(err.into()),
1065 _ => Err(ShellError::PluginFailedToDecode {
1066 msg: "Received unexpected response to custom value partial_cmp() call".into(),
1067 }),
1068 }
1069 }
1070
1071 pub fn custom_value_operation(
1073 &self,
1074 left: Spanned<PluginCustomValueWithSource>,
1075 operator: Spanned<Operator>,
1076 right: Value,
1077 ) -> Result<Value, ShellError> {
1078 self.custom_value_op_expecting_value(left, CustomValueOp::Operation(operator, right))
1079 }
1080
1081 pub fn custom_value_save(
1083 &self,
1084 value: Spanned<PluginCustomValueWithSource>,
1085 path: Spanned<&Path>,
1086 save_call_span: Span,
1087 ) -> Result<(), ShellError> {
1088 value.item.verify_source(value.span, &self.state.source)?;
1090
1091 let call = PluginCall::CustomValueOp(
1092 value.map(|cv| cv.without_source()),
1093 CustomValueOp::Save {
1094 path: path.map(ToOwned::to_owned),
1095 save_call_span,
1096 },
1097 );
1098 match self.plugin_call(call, None)? {
1099 PluginCallResponse::Ok => Ok(()),
1100 PluginCallResponse::Error(err) => Err(err.into()),
1101 _ => Err(ShellError::PluginFailedToDecode {
1102 msg: "Received unexpected response to custom value save() call".into(),
1103 }),
1104 }
1105 }
1106
1107 pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
1109 drop(self.write_plugin_call(
1114 PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
1115 None,
1116 )?);
1117 Ok(())
1118 }
1119}
1120
1121impl Interface for PluginInterface {
1122 type Output = PluginInput;
1123 type DataContext = CurrentCallState;
1124
1125 fn write(&self, input: PluginInput) -> Result<(), ShellError> {
1126 log::trace!("to plugin: {input:?}");
1127 self.state.writer.write(&input).map_err(|err| {
1128 log::warn!("write() error: {err}");
1129 self.state.error.get().cloned().unwrap_or(err)
1132 })
1133 }
1134
1135 fn flush(&self) -> Result<(), ShellError> {
1136 self.state.writer.flush().map_err(|err| {
1137 log::warn!("flush() error: {err}");
1138 self.state.error.get().cloned().unwrap_or(err)
1141 })
1142 }
1143
1144 fn stream_id_sequence(&self) -> &Sequence {
1145 &self.state.stream_id_sequence
1146 }
1147
1148 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1149 &self.stream_manager_handle
1150 }
1151
1152 fn prepare_pipeline_data(
1153 &self,
1154 data: PipelineData,
1155 state: &CurrentCallState,
1156 ) -> Result<PipelineData, ShellError> {
1157 match data {
1159 PipelineData::Value(mut value, meta) => {
1160 state.prepare_value(&mut value, &self.state.source)?;
1161 Ok(PipelineData::value(value, meta))
1162 }
1163 PipelineData::ListStream(stream, meta) => {
1164 let source = self.state.source.clone();
1165 let state = state.clone();
1166 Ok(PipelineData::list_stream(
1167 stream.map(move |mut value| {
1168 match state.prepare_value(&mut value, &source) {
1169 Ok(()) => value,
1170 Err(err) => Value::error(err, value.span()),
1172 }
1173 }),
1174 meta,
1175 ))
1176 }
1177 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1178 }
1179 }
1180}
1181
1182impl Drop for PluginInterface {
1183 fn drop(&mut self) {
1184 if Arc::strong_count(&self.state) < 3
1190 && let Err(err) = self.goodbye()
1191 {
1192 log::warn!("Error during plugin Goodbye: {err}");
1193 }
1194 }
1195}
1196
1197#[must_use]
1199struct WritePluginCallResult {
1200 receiver: mpsc::Receiver<ReceivedPluginCallMessage>,
1202 writer: PipelineDataWriter<PluginInterface>,
1204 state: CurrentCallState,
1206}
1207
1208#[derive(Default, Clone)]
1210pub struct CurrentCallState {
1211 context_tx: Option<mpsc::Sender<Context>>,
1214 keep_plugin_custom_values_tx: Option<mpsc::Sender<PluginCustomValueWithSource>>,
1217 entered_foreground: bool,
1220 span: Option<Span>,
1222}
1223
1224impl CurrentCallState {
1225 fn prepare_custom_value(
1228 &self,
1229 custom_value: Spanned<&mut Box<dyn CustomValue>>,
1230 source: &PluginSource,
1231 ) -> Result<(), ShellError> {
1232 PluginCustomValueWithSource::verify_source_of_custom_value(
1234 custom_value.as_deref().map(|cv| &**cv),
1235 source,
1236 )?;
1237
1238 if let Some(keep_tx) = &self.keep_plugin_custom_values_tx
1240 && let Some(custom_value) = custom_value
1241 .item
1242 .as_any()
1243 .downcast_ref::<PluginCustomValueWithSource>()
1244 && custom_value.notify_on_drop()
1245 {
1246 log::trace!("Keeping custom value for drop later: {custom_value:?}");
1247 keep_tx
1248 .send(custom_value.clone())
1249 .map_err(|_| ShellError::NushellFailed {
1250 msg: "Failed to custom value to keep channel".into(),
1251 })?;
1252 }
1253
1254 PluginCustomValueWithSource::remove_source(&mut *custom_value.item);
1256
1257 Ok(())
1258 }
1259
1260 fn prepare_value(&self, value: &mut Value, source: &PluginSource) -> Result<(), ShellError> {
1262 with_custom_values_in(value, |custom_value| {
1263 self.prepare_custom_value(custom_value, source)
1264 })
1265 }
1266
1267 fn prepare_call_args(
1269 &self,
1270 call: &mut EvaluatedCall,
1271 source: &PluginSource,
1272 ) -> Result<(), ShellError> {
1273 for arg in call.positional.iter_mut() {
1274 self.prepare_value(arg, source)?;
1275 }
1276 for arg in call.named.iter_mut().flat_map(|(_, arg)| arg.as_mut()) {
1277 self.prepare_value(arg, source)?;
1278 }
1279 Ok(())
1280 }
1281
1282 fn prepare_plugin_call<D>(
1285 &self,
1286 call: &mut PluginCall<D>,
1287 source: &PluginSource,
1288 ) -> Result<(), ShellError> {
1289 match call {
1290 PluginCall::Metadata => Ok(()),
1291 PluginCall::Signature => Ok(()),
1292 PluginCall::GetCompletion(_) => Ok(()),
1293 PluginCall::Run(CallInfo { call, .. }) => self.prepare_call_args(call, source),
1294 PluginCall::CustomValueOp(_, op) => {
1295 match op {
1297 CustomValueOp::ToBaseValue => Ok(()),
1298 CustomValueOp::FollowPathInt { .. } => Ok(()),
1299 CustomValueOp::FollowPathString { .. } => Ok(()),
1300 CustomValueOp::PartialCmp(value) => self.prepare_value(value, source),
1301 CustomValueOp::Operation(_, value) => self.prepare_value(value, source),
1302 CustomValueOp::Save { .. } => Ok(()),
1303 CustomValueOp::Dropped => Ok(()),
1304 }
1305 }
1306 }
1307 }
1308}
1309
1310pub(crate) fn handle_engine_call(
1312 call: EngineCall<PipelineData>,
1313 state: &mut CurrentCallState,
1314 context: Option<&mut (dyn PluginExecutionContext + '_)>,
1315 process: Option<&PluginProcess>,
1316) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1317 let call_name = call.name();
1318
1319 let context = context.ok_or_else(|| ShellError::GenericError {
1320 error: "A plugin execution context is required for this engine call".into(),
1321 msg: format!("attempted to call {call_name} outside of a command invocation"),
1322 span: None,
1323 help: Some("this is probably a bug with the plugin".into()),
1324 inner: vec![],
1325 })?;
1326
1327 match call {
1328 EngineCall::GetConfig => {
1329 let config = SharedCow::from(context.get_config()?);
1330 Ok(EngineCallResponse::Config(config))
1331 }
1332 EngineCall::GetPluginConfig => {
1333 let plugin_config = context.get_plugin_config()?;
1334 Ok(plugin_config.map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1335 }
1336 EngineCall::GetEnvVar(name) => {
1337 let value = context.get_env_var(&name)?;
1338 Ok(value
1339 .cloned()
1340 .map_or_else(EngineCallResponse::empty, EngineCallResponse::value))
1341 }
1342 EngineCall::GetEnvVars => context.get_env_vars().map(EngineCallResponse::ValueMap),
1343 EngineCall::GetCurrentDir => {
1344 let current_dir = context.get_current_dir()?;
1345 Ok(EngineCallResponse::value(Value::string(
1346 current_dir.item,
1347 current_dir.span,
1348 )))
1349 }
1350 EngineCall::AddEnvVar(name, value) => {
1351 context.add_env_var(name, value)?;
1352 Ok(EngineCallResponse::empty())
1353 }
1354 EngineCall::GetHelp => {
1355 let help = context.get_help()?;
1356 Ok(EngineCallResponse::value(Value::string(
1357 help.item, help.span,
1358 )))
1359 }
1360 EngineCall::EnterForeground => {
1361 let resp = set_foreground(process, context, true)?;
1362 state.entered_foreground = true;
1363 Ok(resp)
1364 }
1365 EngineCall::LeaveForeground => {
1366 let resp = set_foreground(process, context, false)?;
1367 state.entered_foreground = false;
1368 Ok(resp)
1369 }
1370 EngineCall::GetSpanContents(span) => {
1371 let contents = context.get_span_contents(span)?;
1372 Ok(EngineCallResponse::value(Value::binary(
1373 contents.item,
1374 contents.span,
1375 )))
1376 }
1377 EngineCall::EvalClosure {
1378 closure,
1379 positional,
1380 input,
1381 redirect_stdout,
1382 redirect_stderr,
1383 } => context
1384 .eval_closure(closure, positional, input, redirect_stdout, redirect_stderr)
1385 .map(EngineCallResponse::PipelineData),
1386 EngineCall::FindDecl(name) => context.find_decl(&name).map(|decl_id| {
1387 if let Some(decl_id) = decl_id {
1388 EngineCallResponse::Identifier(decl_id)
1389 } else {
1390 EngineCallResponse::empty()
1391 }
1392 }),
1393 EngineCall::CallDecl {
1394 decl_id,
1395 call,
1396 input,
1397 redirect_stdout,
1398 redirect_stderr,
1399 } => context
1400 .call_decl(decl_id, call, input, redirect_stdout, redirect_stderr)
1401 .map(EngineCallResponse::PipelineData),
1402 }
1403}
1404
1405fn set_foreground(
1407 process: Option<&PluginProcess>,
1408 context: &mut dyn PluginExecutionContext,
1409 enter: bool,
1410) -> Result<EngineCallResponse<PipelineData>, ShellError> {
1411 if let Some(process) = process {
1412 if let Some(pipeline_externals_state) = context.pipeline_externals_state() {
1413 if enter {
1414 let pgrp = process.enter_foreground(context.span(), pipeline_externals_state)?;
1415 Ok(pgrp.map_or_else(EngineCallResponse::empty, |id| {
1416 EngineCallResponse::value(Value::int(id as i64, context.span()))
1417 }))
1418 } else {
1419 process.exit_foreground()?;
1420 Ok(EngineCallResponse::empty())
1421 }
1422 } else {
1423 Err(ShellError::NushellFailed {
1425 msg: "missing required pipeline_externals_state from context \
1426 for entering foreground"
1427 .into(),
1428 })
1429 }
1430 } else {
1431 Err(ShellError::GenericError {
1432 error: "Can't manage plugin process to enter foreground".into(),
1433 msg: "the process ID for this plugin is unknown".into(),
1434 span: Some(context.span()),
1435 help: Some("the plugin may be running in a test".into()),
1436 inner: vec![],
1437 })
1438 }
1439}