1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10 GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11 PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13#[cfg(unix)]
14use nu_protocol::shell_error::generic::GenericError;
15use nu_protocol::{
16 BlockId, Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, PipelineData,
17 PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
18 engine::{Closure, Sequence},
19 ir::IrBlock,
20};
21use nu_utils::SharedCow;
22use std::{
23 collections::{BTreeMap, HashMap, btree_map},
24 sync::{Arc, atomic::AtomicBool, mpsc},
25};
26
27#[derive(Debug)]
35#[doc(hidden)]
36pub enum ReceivedPluginCall {
37 Metadata {
38 engine: EngineInterface,
39 },
40 Signature {
41 engine: EngineInterface,
42 },
43 Run {
44 engine: EngineInterface,
45 call: CallInfo<PipelineData>,
46 },
47 GetCompletion {
48 engine: EngineInterface,
49 info: GetCompletionInfo,
50 },
51 CustomValueOp {
52 engine: EngineInterface,
53 custom_value: Spanned<PluginCustomValue>,
54 op: CustomValueOp,
55 },
56}
57
58#[cfg(test)]
59mod tests;
60
61struct EngineInterfaceState {
63 protocol_info: Waitable<Arc<ProtocolInfo>>,
65 engine_call_id_sequence: Sequence,
67 stream_id_sequence: Sequence,
69 engine_call_subscription_sender:
71 mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
72 writer: Box<dyn PluginWrite<PluginOutput>>,
74 signals: Signals,
77 signal_handlers: Handlers,
79}
80
81impl std::fmt::Debug for EngineInterfaceState {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("EngineInterfaceState")
84 .field("protocol_info", &self.protocol_info)
85 .field("engine_call_id_sequence", &self.engine_call_id_sequence)
86 .field("stream_id_sequence", &self.stream_id_sequence)
87 .field(
88 "engine_call_subscription_sender",
89 &self.engine_call_subscription_sender,
90 )
91 .finish_non_exhaustive()
92 }
93}
94
95#[derive(Debug)]
99#[doc(hidden)]
100pub struct EngineInterfaceManager {
101 state: Arc<EngineInterfaceState>,
103 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
105 plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
107 plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
109 engine_call_subscriptions:
111 BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
112 engine_call_subscription_receiver:
114 mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
115 stream_manager: StreamManager,
117}
118
119impl EngineInterfaceManager {
120 pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
121 let (plug_tx, plug_rx) = mpsc::channel();
122 let (subscription_tx, subscription_rx) = mpsc::channel();
123 let protocol_info_mut = WaitableMut::new();
124
125 EngineInterfaceManager {
126 state: Arc::new(EngineInterfaceState {
127 protocol_info: protocol_info_mut.reader(),
128 engine_call_id_sequence: Sequence::default(),
129 stream_id_sequence: Sequence::default(),
130 engine_call_subscription_sender: subscription_tx,
131 writer: Box::new(writer),
132 signals: Signals::new(Arc::new(AtomicBool::new(false))),
133 signal_handlers: Handlers::new(),
134 }),
135 protocol_info_mut,
136 plugin_call_sender: Some(plug_tx),
137 plugin_call_receiver: Some(plug_rx),
138 engine_call_subscriptions: BTreeMap::new(),
139 engine_call_subscription_receiver: subscription_rx,
140 stream_manager: StreamManager::new(),
141 }
142 }
143
144 pub(crate) fn take_plugin_call_receiver(
147 &mut self,
148 ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
149 self.plugin_call_receiver.take()
150 }
151
152 fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
154 EngineInterface {
155 state: self.state.clone(),
156 stream_manager_handle: self.stream_manager.get_handle(),
157 context: Some(context),
158 }
159 }
160
161 fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
163 self.plugin_call_sender
164 .as_ref()
165 .ok_or_else(|| ShellError::PluginFailedToDecode {
166 msg: "Received a plugin call after Goodbye".into(),
167 })?
168 .send(plugin_call)
169 .map_err(|_| ShellError::NushellFailed {
170 msg: "Received a plugin call, but there's nowhere to send it".into(),
171 })
172 }
173
174 fn receive_engine_call_subscriptions(&mut self) {
176 for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
177 if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
178 e.insert(subscription);
179 } else {
180 log::warn!("Duplicate engine call ID ignored: {id}")
181 }
182 }
183 }
184
185 fn send_engine_call_response(
187 &mut self,
188 id: EngineCallId,
189 response: EngineCallResponse<PipelineData>,
190 ) -> Result<(), ShellError> {
191 self.receive_engine_call_subscriptions();
193 if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
195 if sender.send(response).is_err() {
196 log::warn!("Received an engine call response for id={id}, but the caller hung up");
197 }
198 Ok(())
199 } else {
200 Err(ShellError::PluginFailedToDecode {
201 msg: format!("Unknown engine call ID: {id}"),
202 })
203 }
204 }
205
206 pub(crate) fn is_finished(&self) -> bool {
209 Arc::strong_count(&self.state) < 2
210 }
211
212 pub(crate) fn consume_all(
216 &mut self,
217 mut reader: impl PluginRead<PluginInput>,
218 ) -> Result<(), ShellError> {
219 while let Some(msg) = reader.read().transpose() {
220 if self.is_finished() {
221 break;
222 }
223
224 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
225 let _ = self.stream_manager.broadcast_read_error(err.clone());
227 self.receive_engine_call_subscriptions();
229 for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
230 let _ = sender.send(EngineCallResponse::Error(err.clone()));
231 }
232 return Err(err);
233 }
234 }
235 Ok(())
236 }
237}
238
239impl InterfaceManager for EngineInterfaceManager {
240 type Interface = EngineInterface;
241 type Input = PluginInput;
242
243 fn get_interface(&self) -> Self::Interface {
244 EngineInterface {
245 state: self.state.clone(),
246 stream_manager_handle: self.stream_manager.get_handle(),
247 context: None,
248 }
249 }
250
251 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
252 log::trace!("from engine: {input:?}");
253 match input {
254 PluginInput::Hello(info) => {
255 let info = Arc::new(info);
256 self.protocol_info_mut.set(info.clone())?;
257
258 let local_info = ProtocolInfo::default();
259 if local_info.is_compatible_with(&info)? {
260 Ok(())
261 } else {
262 Err(ShellError::PluginFailedToLoad {
263 msg: format!(
264 "Plugin is compiled for nushell version {}, \
265 which is not compatible with version {}",
266 local_info.version, info.version
267 ),
268 })
269 }
270 }
271 _ if !self.state.protocol_info.is_set() => {
272 Err(ShellError::PluginFailedToLoad {
274 msg: "Failed to receive initial Hello message. This engine might be too old"
275 .into(),
276 })
277 }
278 PluginInput::Data(..)
280 | PluginInput::End(..)
281 | PluginInput::Drop(..)
282 | PluginInput::Ack(..) => {
283 self.consume_stream_message(input.try_into().map_err(|msg| {
284 ShellError::NushellFailed {
285 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
286 }
287 })?)
288 }
289 PluginInput::Call(id, call) => {
290 let interface = self.interface_for_context(id);
291 let call = match call
293 .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
294 {
295 Ok(call) => call,
296 Err(err) => {
297 return interface.write_response(Err(err))?.write();
300 }
301 };
302 match call {
303 PluginCall::Metadata => {
305 self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
306 }
307 PluginCall::Signature => {
309 self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
310 }
311 PluginCall::Run(mut call_info) => {
313 if let Err(err) = deserialize_call_args(&mut call_info.call) {
315 return interface.write_response(Err(err))?.write();
316 }
317 self.send_plugin_call(ReceivedPluginCall::Run {
319 engine: interface,
320 call: call_info,
321 })
322 }
323 PluginCall::CustomValueOp(custom_value, op) => {
325 self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
326 engine: interface,
327 custom_value,
328 op,
329 })
330 }
331 PluginCall::GetCompletion(info) => {
332 self.send_plugin_call(ReceivedPluginCall::GetCompletion {
333 engine: interface,
334 info,
335 })
336 }
337 }
338 }
339 PluginInput::Goodbye => {
340 drop(self.plugin_call_sender.take());
342 Ok(())
343 }
344 PluginInput::EngineCallResponse(id, response) => {
345 let response = response
346 .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
347 .unwrap_or_else(|err| {
348 EngineCallResponse::Error(err)
351 });
352 self.send_engine_call_response(id, response)
353 }
354 PluginInput::Signal(action) => {
355 match action {
356 SignalAction::Interrupt => self.state.signals.trigger(),
357 SignalAction::Reset => self.state.signals.reset(),
358 }
359 self.state.signal_handlers.run(action);
360 Ok(())
361 }
362 }
363 }
364
365 fn stream_manager(&self) -> &StreamManager {
366 &self.stream_manager
367 }
368
369 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
370 match data {
372 PipelineData::Value(ref mut value, _) => {
373 PluginCustomValue::deserialize_custom_values_in(value)?;
374 Ok(data)
375 }
376 PipelineData::ListStream(stream, meta) => {
377 let stream = stream.map(|mut value| {
378 let span = value.span();
379 PluginCustomValue::deserialize_custom_values_in(&mut value)
380 .map(|()| value)
381 .unwrap_or_else(|err| Value::error(err, span))
382 });
383 Ok(PipelineData::list_stream(stream, meta))
384 }
385 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
386 }
387 }
388}
389
390fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
392 call.positional
393 .iter_mut()
394 .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
395 call.named
396 .iter_mut()
397 .flat_map(|(_, value)| value.as_mut())
398 .try_for_each(PluginCustomValue::deserialize_custom_values_in)
399}
400
401#[derive(Debug, Clone)]
403pub struct EngineInterface {
404 state: Arc<EngineInterfaceState>,
406 stream_manager_handle: StreamManagerHandle,
408 context: Option<PluginCallId>,
410}
411
412impl EngineInterface {
413 pub(crate) fn hello(&self) -> Result<(), ShellError> {
415 self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
416 self.flush()
417 }
418
419 fn context(&self) -> Result<PluginCallId, ShellError> {
420 self.context.ok_or_else(|| ShellError::NushellFailed {
421 msg: "Tried to call an EngineInterface method that requires a call context \
422 outside of one"
423 .into(),
424 })
425 }
426
427 pub(crate) fn write_ok(
429 &self,
430 result: Result<(), impl Into<ShellError>>,
431 ) -> Result<(), ShellError> {
432 let response = match result {
433 Ok(()) => PluginCallResponse::Ok,
434 Err(err) => PluginCallResponse::Error(err.into()),
435 };
436 self.write(PluginOutput::CallResponse(self.context()?, response))?;
437 self.flush()
438 }
439
440 pub(crate) fn write_response(
443 &self,
444 result: Result<PipelineData, impl Into<ShellError>>,
445 ) -> Result<PipelineDataWriter<Self>, ShellError> {
446 match result {
447 Ok(data) => {
448 let (header, writer) = match self.init_write_pipeline_data(data, &()) {
449 Ok(tup) => tup,
450 Err(err) => return self.write_response(Err(err)),
453 };
454 let response = PluginCallResponse::PipelineData(header);
456 self.write(PluginOutput::CallResponse(self.context()?, response))?;
457 self.flush()?;
458 Ok(writer)
459 }
460 Err(err) => {
461 let response = PluginCallResponse::Error(err.into());
462 self.write(PluginOutput::CallResponse(self.context()?, response))?;
463 self.flush()?;
464 Ok(Default::default())
465 }
466 }
467 }
468
469 pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
471 let response = PluginCallResponse::Metadata(metadata);
472 self.write(PluginOutput::CallResponse(self.context()?, response))?;
473 self.flush()
474 }
475
476 pub(crate) fn write_signature(
480 &self,
481 signature: Vec<PluginSignature>,
482 ) -> Result<(), ShellError> {
483 let response = PluginCallResponse::Signature(signature);
484 self.write(PluginOutput::CallResponse(self.context()?, response))?;
485 self.flush()
486 }
487
488 pub(crate) fn write_completion_items(
490 &self,
491 items: Option<Vec<DynamicSuggestion>>,
492 ) -> Result<(), ShellError> {
493 let response = PluginCallResponse::CompletionItems(items);
494 self.write(PluginOutput::CallResponse(self.context()?, response))?;
495 self.flush()
496 }
497
498 fn write_engine_call(
501 &self,
502 call: EngineCall<PipelineData>,
503 ) -> Result<
504 (
505 PipelineDataWriter<Self>,
506 mpsc::Receiver<EngineCallResponse<PipelineData>>,
507 ),
508 ShellError,
509 > {
510 let context = self.context()?;
511 let id = self.state.engine_call_id_sequence.next()?;
512 let (tx, rx) = mpsc::channel();
513
514 let mut writer = None;
516
517 let call = call.map_data(|input| {
518 let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
519 writer = Some(input_writer);
520 Ok(input_header)
521 })?;
522
523 self.state
525 .engine_call_subscription_sender
526 .send((id, tx))
527 .map_err(|_| ShellError::NushellFailed {
528 msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
529 .into(),
530 })?;
531
532 self.write(PluginOutput::EngineCall { context, id, call })?;
534 self.flush()?;
535
536 Ok((writer.unwrap_or_default(), rx))
537 }
538
539 fn engine_call(
541 &self,
542 call: EngineCall<PipelineData>,
543 ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
544 let (writer, rx) = self.write_engine_call(call)?;
545
546 writer.write_background()?;
548
549 rx.recv().map_err(|_| ShellError::NushellFailed {
551 msg: "Failed to get response to engine call because the channel was closed".into(),
552 })
553 }
554
555 pub fn is_using_stdio(&self) -> bool {
561 self.state.writer.is_stdout()
562 }
563
564 pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
567 self.state.signal_handlers.register(handler)
568 }
569
570 pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
587 match self.engine_call(EngineCall::GetConfig)? {
588 EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
589 EngineCallResponse::Error(err) => Err(err),
590 _ => Err(ShellError::PluginFailedToDecode {
591 msg: "Received unexpected response for EngineCall::GetConfig".into(),
592 }),
593 }
594 }
595
596 fn engine_call_option_value(
599 &self,
600 engine_call: EngineCall<PipelineData>,
601 ) -> Result<Option<Value>, ShellError> {
602 let name = engine_call.name();
603 match self.engine_call(engine_call)? {
604 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
605 EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
606 EngineCallResponse::Error(err) => Err(err),
607 _ => Err(ShellError::PluginFailedToDecode {
608 msg: format!("Received unexpected response for EngineCall::{name}"),
609 }),
610 }
611 }
612
613 pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
631 self.engine_call_option_value(EngineCall::GetPluginConfig)
632 }
633
634 pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
650 self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
651 }
652
653 pub fn get_current_dir(&self) -> Result<String, ShellError> {
664 match self.engine_call(EngineCall::GetCurrentDir)? {
665 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
667 Ok(val)
668 }
669 EngineCallResponse::Error(err) => Err(err),
670 _ => Err(ShellError::PluginFailedToDecode {
671 msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
672 }),
673 }
674 }
675
676 pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
692 match self.engine_call(EngineCall::GetEnvVars)? {
693 EngineCallResponse::ValueMap(map) => Ok(map),
694 EngineCallResponse::Error(err) => Err(err),
695 _ => Err(ShellError::PluginFailedToDecode {
696 msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
697 }),
698 }
699 }
700
701 pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
716 match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
717 EngineCallResponse::PipelineData(_) => Ok(()),
718 EngineCallResponse::Error(err) => Err(err),
719 _ => Err(ShellError::PluginFailedToDecode {
720 msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
721 }),
722 }
723 }
724
725 pub fn get_help(&self) -> Result<String, ShellError> {
740 match self.engine_call(EngineCall::GetHelp)? {
741 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
742 Ok(val)
743 }
744 _ => Err(ShellError::PluginFailedToDecode {
745 msg: "Received unexpected response type for EngineCall::GetHelp".into(),
746 }),
747 }
748 }
749
750 pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
758 match self.engine_call(EngineCall::EnterForeground)? {
759 EngineCallResponse::Error(error) => Err(error),
760 EngineCallResponse::PipelineData(PipelineData::Value(
761 Value::Int { val: pgrp, .. },
762 _,
763 )) => {
764 set_pgrp_from_enter_foreground(pgrp)?;
765 Ok(ForegroundGuard(Some(self.clone())))
766 }
767 EngineCallResponse::PipelineData(PipelineData::Empty) => {
768 Ok(ForegroundGuard(Some(self.clone())))
769 }
770 _ => Err(ShellError::PluginFailedToDecode {
771 msg: "Received unexpected response type for EngineCall::SetForeground".into(),
772 }),
773 }
774 }
775
776 fn leave_foreground(&self) -> Result<(), ShellError> {
778 match self.engine_call(EngineCall::LeaveForeground)? {
779 EngineCallResponse::Error(error) => Err(error),
780 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
781 _ => Err(ShellError::PluginFailedToDecode {
782 msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
783 }),
784 }
785 }
786
787 pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
793 match self.engine_call(EngineCall::GetSpanContents(span))? {
794 EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
795 Ok(val)
796 }
797 _ => Err(ShellError::PluginFailedToDecode {
798 msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
799 }),
800 }
801 }
802
803 pub fn eval_closure_with_stream(
849 &self,
850 closure: &Spanned<Closure>,
851 mut positional: Vec<Value>,
852 input: PipelineData,
853 redirect_stdout: bool,
854 redirect_stderr: bool,
855 ) -> Result<PipelineData, ShellError> {
856 positional
858 .iter_mut()
859 .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
860
861 let call = EngineCall::EvalClosure {
862 closure: closure.clone(),
863 positional,
864 input,
865 redirect_stdout,
866 redirect_stderr,
867 };
868
869 match self.engine_call(call)? {
870 EngineCallResponse::Error(error) => Err(error),
871 EngineCallResponse::PipelineData(data) => Ok(data),
872 _ => Err(ShellError::PluginFailedToDecode {
873 msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
874 }),
875 }
876 }
877
878 pub fn eval_closure(
919 &self,
920 closure: &Spanned<Closure>,
921 positional: Vec<Value>,
922 input: Option<Value>,
923 ) -> Result<Value, ShellError> {
924 let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
925 let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
926 match output.into_value(closure.span)? {
928 Value::Error { error, .. } => Err(*error),
929 value => Ok(value),
930 }
931 }
932
933 pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
938 let call = EngineCall::FindDecl(name.into());
939
940 match self.engine_call(call)? {
941 EngineCallResponse::Error(err) => Err(err),
942 EngineCallResponse::Identifier(id) => Ok(Some(id)),
943 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
944 _ => Err(ShellError::PluginFailedToDecode {
945 msg: "Received unexpected response type for EngineCall::FindDecl".into(),
946 }),
947 }
948 }
949
950 pub fn get_block_ir(&self, block_id: BlockId) -> Result<IrBlock, ShellError> {
967 let call = EngineCall::GetBlockIR(block_id);
968
969 match self.engine_call(call)? {
970 EngineCallResponse::Error(err) => Err(err),
971 EngineCallResponse::IrBlock(ir) => Ok(*ir),
972 _ => Err(ShellError::PluginFailedToDecode {
973 msg: "Received unexpected response type for EngineCall::GetBlockIR".into(),
974 }),
975 }
976 }
977
978 pub fn call_decl(
1002 &self,
1003 decl_id: DeclId,
1004 call: EvaluatedCall,
1005 input: PipelineData,
1006 redirect_stdout: bool,
1007 redirect_stderr: bool,
1008 ) -> Result<PipelineData, ShellError> {
1009 let call = EngineCall::CallDecl {
1010 decl_id,
1011 call,
1012 input,
1013 redirect_stdout,
1014 redirect_stderr,
1015 };
1016
1017 match self.engine_call(call)? {
1018 EngineCallResponse::Error(err) => Err(err),
1019 EngineCallResponse::PipelineData(data) => Ok(data),
1020 _ => Err(ShellError::PluginFailedToDecode {
1021 msg: "Received unexpected response type for EngineCall::CallDecl".into(),
1022 }),
1023 }
1024 }
1025
1026 pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1034 self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1035 self.flush()
1036 }
1037
1038 pub(crate) fn write_ordering(
1040 &self,
1041 ordering: Option<impl Into<Ordering>>,
1042 ) -> Result<(), ShellError> {
1043 let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1044 self.write(PluginOutput::CallResponse(self.context()?, response))?;
1045 self.flush()
1046 }
1047
1048 pub fn signals(&self) -> &Signals {
1049 &self.state.signals
1050 }
1051}
1052
1053impl Interface for EngineInterface {
1054 type Output = PluginOutput;
1055 type DataContext = ();
1056
1057 fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1058 log::trace!("to engine: {output:?}");
1059 self.state.writer.write(&output)
1060 }
1061
1062 fn flush(&self) -> Result<(), ShellError> {
1063 self.state.writer.flush()
1064 }
1065
1066 fn stream_id_sequence(&self) -> &Sequence {
1067 &self.state.stream_id_sequence
1068 }
1069
1070 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1071 &self.stream_manager_handle
1072 }
1073
1074 fn prepare_pipeline_data(
1075 &self,
1076 mut data: PipelineData,
1077 _context: &(),
1078 ) -> Result<PipelineData, ShellError> {
1079 match data {
1081 PipelineData::Value(ref mut value, _) => {
1082 PluginCustomValue::serialize_custom_values_in(value)?;
1083 Ok(data)
1084 }
1085 PipelineData::ListStream(stream, meta) => {
1086 let stream = stream.map(|mut value| {
1087 let span = value.span();
1088 PluginCustomValue::serialize_custom_values_in(&mut value)
1089 .map(|_| value)
1090 .unwrap_or_else(|err| Value::error(err, span))
1091 });
1092 Ok(PipelineData::list_stream(stream, meta))
1093 }
1094 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1095 }
1096 }
1097}
1098
1099pub struct ForegroundGuard(Option<EngineInterface>);
1103
1104impl ForegroundGuard {
1105 fn leave_internal(&mut self) -> Result<(), ShellError> {
1107 if let Some(interface) = self.0.take() {
1108 #[cfg(unix)]
1110 {
1111 use nix::unistd::{Pid, setpgid};
1112 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1114 nu_protocol::shell_error::io::IoError::new_internal(
1115 std::io::Error::from(err),
1116 "Could not set pgid",
1117 )
1118 })?;
1119 }
1120 interface.leave_foreground()?;
1121 }
1122 Ok(())
1123 }
1124
1125 pub fn leave(mut self) -> Result<(), ShellError> {
1127 let result = self.leave_internal();
1128 std::mem::forget(self);
1129 result
1130 }
1131}
1132
1133impl Drop for ForegroundGuard {
1134 fn drop(&mut self) {
1135 let _ = self.leave_internal();
1136 }
1137}
1138
1139#[cfg(unix)]
1140fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1141 use nix::unistd::{Pid, setpgid};
1142 if let Ok(pgrp) = pgrp.try_into() {
1143 setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| {
1144 ShellError::Generic(
1145 GenericError::new_internal("Failed to set process group for foreground", "")
1146 .with_help(err.to_string()),
1147 )
1148 })
1149 } else {
1150 Err(ShellError::NushellFailed {
1151 msg: "Engine returned an invalid process group ID".into(),
1152 })
1153 }
1154}
1155
1156#[cfg(not(unix))]
1157fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1158 Err(ShellError::NushellFailed {
1159 msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1160 })
1161}