1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10 GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11 PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14 BlockId, Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, PipelineData,
15 PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16 engine::{Closure, Sequence},
17 ir::IrBlock,
18};
19use nu_utils::SharedCow;
20use std::{
21 collections::{BTreeMap, HashMap, btree_map},
22 sync::{Arc, atomic::AtomicBool, mpsc},
23};
24
25#[derive(Debug)]
33#[doc(hidden)]
34pub enum ReceivedPluginCall {
35 Metadata {
36 engine: EngineInterface,
37 },
38 Signature {
39 engine: EngineInterface,
40 },
41 Run {
42 engine: EngineInterface,
43 call: CallInfo<PipelineData>,
44 },
45 GetCompletion {
46 engine: EngineInterface,
47 info: GetCompletionInfo,
48 },
49 CustomValueOp {
50 engine: EngineInterface,
51 custom_value: Spanned<PluginCustomValue>,
52 op: CustomValueOp,
53 },
54}
55
56#[cfg(test)]
57mod tests;
58
59struct EngineInterfaceState {
61 protocol_info: Waitable<Arc<ProtocolInfo>>,
63 engine_call_id_sequence: Sequence,
65 stream_id_sequence: Sequence,
67 engine_call_subscription_sender:
69 mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
70 writer: Box<dyn PluginWrite<PluginOutput>>,
72 signals: Signals,
75 signal_handlers: Handlers,
77}
78
79impl std::fmt::Debug for EngineInterfaceState {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("EngineInterfaceState")
82 .field("protocol_info", &self.protocol_info)
83 .field("engine_call_id_sequence", &self.engine_call_id_sequence)
84 .field("stream_id_sequence", &self.stream_id_sequence)
85 .field(
86 "engine_call_subscription_sender",
87 &self.engine_call_subscription_sender,
88 )
89 .finish_non_exhaustive()
90 }
91}
92
93#[derive(Debug)]
97#[doc(hidden)]
98pub struct EngineInterfaceManager {
99 state: Arc<EngineInterfaceState>,
101 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
103 plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
105 plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
107 engine_call_subscriptions:
109 BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
110 engine_call_subscription_receiver:
112 mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
113 stream_manager: StreamManager,
115}
116
117impl EngineInterfaceManager {
118 pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
119 let (plug_tx, plug_rx) = mpsc::channel();
120 let (subscription_tx, subscription_rx) = mpsc::channel();
121 let protocol_info_mut = WaitableMut::new();
122
123 EngineInterfaceManager {
124 state: Arc::new(EngineInterfaceState {
125 protocol_info: protocol_info_mut.reader(),
126 engine_call_id_sequence: Sequence::default(),
127 stream_id_sequence: Sequence::default(),
128 engine_call_subscription_sender: subscription_tx,
129 writer: Box::new(writer),
130 signals: Signals::new(Arc::new(AtomicBool::new(false))),
131 signal_handlers: Handlers::new(),
132 }),
133 protocol_info_mut,
134 plugin_call_sender: Some(plug_tx),
135 plugin_call_receiver: Some(plug_rx),
136 engine_call_subscriptions: BTreeMap::new(),
137 engine_call_subscription_receiver: subscription_rx,
138 stream_manager: StreamManager::new(),
139 }
140 }
141
142 pub(crate) fn take_plugin_call_receiver(
145 &mut self,
146 ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
147 self.plugin_call_receiver.take()
148 }
149
150 fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
152 EngineInterface {
153 state: self.state.clone(),
154 stream_manager_handle: self.stream_manager.get_handle(),
155 context: Some(context),
156 }
157 }
158
159 fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
161 self.plugin_call_sender
162 .as_ref()
163 .ok_or_else(|| ShellError::PluginFailedToDecode {
164 msg: "Received a plugin call after Goodbye".into(),
165 })?
166 .send(plugin_call)
167 .map_err(|_| ShellError::NushellFailed {
168 msg: "Received a plugin call, but there's nowhere to send it".into(),
169 })
170 }
171
172 fn receive_engine_call_subscriptions(&mut self) {
174 for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
175 if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
176 e.insert(subscription);
177 } else {
178 log::warn!("Duplicate engine call ID ignored: {id}")
179 }
180 }
181 }
182
183 fn send_engine_call_response(
185 &mut self,
186 id: EngineCallId,
187 response: EngineCallResponse<PipelineData>,
188 ) -> Result<(), ShellError> {
189 self.receive_engine_call_subscriptions();
191 if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
193 if sender.send(response).is_err() {
194 log::warn!("Received an engine call response for id={id}, but the caller hung up");
195 }
196 Ok(())
197 } else {
198 Err(ShellError::PluginFailedToDecode {
199 msg: format!("Unknown engine call ID: {id}"),
200 })
201 }
202 }
203
204 pub(crate) fn is_finished(&self) -> bool {
207 Arc::strong_count(&self.state) < 2
208 }
209
210 pub(crate) fn consume_all(
214 &mut self,
215 mut reader: impl PluginRead<PluginInput>,
216 ) -> Result<(), ShellError> {
217 while let Some(msg) = reader.read().transpose() {
218 if self.is_finished() {
219 break;
220 }
221
222 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
223 let _ = self.stream_manager.broadcast_read_error(err.clone());
225 self.receive_engine_call_subscriptions();
227 for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
228 let _ = sender.send(EngineCallResponse::Error(err.clone()));
229 }
230 return Err(err);
231 }
232 }
233 Ok(())
234 }
235}
236
237impl InterfaceManager for EngineInterfaceManager {
238 type Interface = EngineInterface;
239 type Input = PluginInput;
240
241 fn get_interface(&self) -> Self::Interface {
242 EngineInterface {
243 state: self.state.clone(),
244 stream_manager_handle: self.stream_manager.get_handle(),
245 context: None,
246 }
247 }
248
249 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
250 log::trace!("from engine: {input:?}");
251 match input {
252 PluginInput::Hello(info) => {
253 let info = Arc::new(info);
254 self.protocol_info_mut.set(info.clone())?;
255
256 let local_info = ProtocolInfo::default();
257 if local_info.is_compatible_with(&info)? {
258 Ok(())
259 } else {
260 Err(ShellError::PluginFailedToLoad {
261 msg: format!(
262 "Plugin is compiled for nushell version {}, \
263 which is not compatible with version {}",
264 local_info.version, info.version
265 ),
266 })
267 }
268 }
269 _ if !self.state.protocol_info.is_set() => {
270 Err(ShellError::PluginFailedToLoad {
272 msg: "Failed to receive initial Hello message. This engine might be too old"
273 .into(),
274 })
275 }
276 PluginInput::Data(..)
278 | PluginInput::End(..)
279 | PluginInput::Drop(..)
280 | PluginInput::Ack(..) => {
281 self.consume_stream_message(input.try_into().map_err(|msg| {
282 ShellError::NushellFailed {
283 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
284 }
285 })?)
286 }
287 PluginInput::Call(id, call) => {
288 let interface = self.interface_for_context(id);
289 let call = match call
291 .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
292 {
293 Ok(call) => call,
294 Err(err) => {
295 return interface.write_response(Err(err))?.write();
298 }
299 };
300 match call {
301 PluginCall::Metadata => {
303 self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
304 }
305 PluginCall::Signature => {
307 self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
308 }
309 PluginCall::Run(mut call_info) => {
311 if let Err(err) = deserialize_call_args(&mut call_info.call) {
313 return interface.write_response(Err(err))?.write();
314 }
315 self.send_plugin_call(ReceivedPluginCall::Run {
317 engine: interface,
318 call: call_info,
319 })
320 }
321 PluginCall::CustomValueOp(custom_value, op) => {
323 self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
324 engine: interface,
325 custom_value,
326 op,
327 })
328 }
329 PluginCall::GetCompletion(info) => {
330 self.send_plugin_call(ReceivedPluginCall::GetCompletion {
331 engine: interface,
332 info,
333 })
334 }
335 }
336 }
337 PluginInput::Goodbye => {
338 drop(self.plugin_call_sender.take());
340 Ok(())
341 }
342 PluginInput::EngineCallResponse(id, response) => {
343 let response = response
344 .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
345 .unwrap_or_else(|err| {
346 EngineCallResponse::Error(err)
349 });
350 self.send_engine_call_response(id, response)
351 }
352 PluginInput::Signal(action) => {
353 match action {
354 SignalAction::Interrupt => self.state.signals.trigger(),
355 SignalAction::Reset => self.state.signals.reset(),
356 }
357 self.state.signal_handlers.run(action);
358 Ok(())
359 }
360 }
361 }
362
363 fn stream_manager(&self) -> &StreamManager {
364 &self.stream_manager
365 }
366
367 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
368 match data {
370 PipelineData::Value(ref mut value, _) => {
371 PluginCustomValue::deserialize_custom_values_in(value)?;
372 Ok(data)
373 }
374 PipelineData::ListStream(stream, meta) => {
375 let stream = stream.map(|mut value| {
376 let span = value.span();
377 PluginCustomValue::deserialize_custom_values_in(&mut value)
378 .map(|()| value)
379 .unwrap_or_else(|err| Value::error(err, span))
380 });
381 Ok(PipelineData::list_stream(stream, meta))
382 }
383 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
384 }
385 }
386}
387
388fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
390 call.positional
391 .iter_mut()
392 .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
393 call.named
394 .iter_mut()
395 .flat_map(|(_, value)| value.as_mut())
396 .try_for_each(PluginCustomValue::deserialize_custom_values_in)
397}
398
399#[derive(Debug, Clone)]
401pub struct EngineInterface {
402 state: Arc<EngineInterfaceState>,
404 stream_manager_handle: StreamManagerHandle,
406 context: Option<PluginCallId>,
408}
409
410impl EngineInterface {
411 pub(crate) fn hello(&self) -> Result<(), ShellError> {
413 self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
414 self.flush()
415 }
416
417 fn context(&self) -> Result<PluginCallId, ShellError> {
418 self.context.ok_or_else(|| ShellError::NushellFailed {
419 msg: "Tried to call an EngineInterface method that requires a call context \
420 outside of one"
421 .into(),
422 })
423 }
424
425 pub(crate) fn write_ok(
427 &self,
428 result: Result<(), impl Into<ShellError>>,
429 ) -> Result<(), ShellError> {
430 let response = match result {
431 Ok(()) => PluginCallResponse::Ok,
432 Err(err) => PluginCallResponse::Error(err.into()),
433 };
434 self.write(PluginOutput::CallResponse(self.context()?, response))?;
435 self.flush()
436 }
437
438 pub(crate) fn write_response(
441 &self,
442 result: Result<PipelineData, impl Into<ShellError>>,
443 ) -> Result<PipelineDataWriter<Self>, ShellError> {
444 match result {
445 Ok(data) => {
446 let (header, writer) = match self.init_write_pipeline_data(data, &()) {
447 Ok(tup) => tup,
448 Err(err) => return self.write_response(Err(err)),
451 };
452 let response = PluginCallResponse::PipelineData(header);
454 self.write(PluginOutput::CallResponse(self.context()?, response))?;
455 self.flush()?;
456 Ok(writer)
457 }
458 Err(err) => {
459 let response = PluginCallResponse::Error(err.into());
460 self.write(PluginOutput::CallResponse(self.context()?, response))?;
461 self.flush()?;
462 Ok(Default::default())
463 }
464 }
465 }
466
467 pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
469 let response = PluginCallResponse::Metadata(metadata);
470 self.write(PluginOutput::CallResponse(self.context()?, response))?;
471 self.flush()
472 }
473
474 pub(crate) fn write_signature(
478 &self,
479 signature: Vec<PluginSignature>,
480 ) -> Result<(), ShellError> {
481 let response = PluginCallResponse::Signature(signature);
482 self.write(PluginOutput::CallResponse(self.context()?, response))?;
483 self.flush()
484 }
485
486 pub(crate) fn write_completion_items(
488 &self,
489 items: Option<Vec<DynamicSuggestion>>,
490 ) -> Result<(), ShellError> {
491 let response = PluginCallResponse::CompletionItems(items);
492 self.write(PluginOutput::CallResponse(self.context()?, response))?;
493 self.flush()
494 }
495
496 fn write_engine_call(
499 &self,
500 call: EngineCall<PipelineData>,
501 ) -> Result<
502 (
503 PipelineDataWriter<Self>,
504 mpsc::Receiver<EngineCallResponse<PipelineData>>,
505 ),
506 ShellError,
507 > {
508 let context = self.context()?;
509 let id = self.state.engine_call_id_sequence.next()?;
510 let (tx, rx) = mpsc::channel();
511
512 let mut writer = None;
514
515 let call = call.map_data(|input| {
516 let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
517 writer = Some(input_writer);
518 Ok(input_header)
519 })?;
520
521 self.state
523 .engine_call_subscription_sender
524 .send((id, tx))
525 .map_err(|_| ShellError::NushellFailed {
526 msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
527 .into(),
528 })?;
529
530 self.write(PluginOutput::EngineCall { context, id, call })?;
532 self.flush()?;
533
534 Ok((writer.unwrap_or_default(), rx))
535 }
536
537 fn engine_call(
539 &self,
540 call: EngineCall<PipelineData>,
541 ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
542 let (writer, rx) = self.write_engine_call(call)?;
543
544 writer.write_background()?;
546
547 rx.recv().map_err(|_| ShellError::NushellFailed {
549 msg: "Failed to get response to engine call because the channel was closed".into(),
550 })
551 }
552
553 pub fn is_using_stdio(&self) -> bool {
559 self.state.writer.is_stdout()
560 }
561
562 pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
565 self.state.signal_handlers.register(handler)
566 }
567
568 pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
585 match self.engine_call(EngineCall::GetConfig)? {
586 EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
587 EngineCallResponse::Error(err) => Err(err),
588 _ => Err(ShellError::PluginFailedToDecode {
589 msg: "Received unexpected response for EngineCall::GetConfig".into(),
590 }),
591 }
592 }
593
594 fn engine_call_option_value(
597 &self,
598 engine_call: EngineCall<PipelineData>,
599 ) -> Result<Option<Value>, ShellError> {
600 let name = engine_call.name();
601 match self.engine_call(engine_call)? {
602 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
603 EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
604 EngineCallResponse::Error(err) => Err(err),
605 _ => Err(ShellError::PluginFailedToDecode {
606 msg: format!("Received unexpected response for EngineCall::{name}"),
607 }),
608 }
609 }
610
611 pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
629 self.engine_call_option_value(EngineCall::GetPluginConfig)
630 }
631
632 pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
648 self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
649 }
650
651 pub fn get_current_dir(&self) -> Result<String, ShellError> {
662 match self.engine_call(EngineCall::GetCurrentDir)? {
663 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
665 Ok(val)
666 }
667 EngineCallResponse::Error(err) => Err(err),
668 _ => Err(ShellError::PluginFailedToDecode {
669 msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
670 }),
671 }
672 }
673
674 pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
690 match self.engine_call(EngineCall::GetEnvVars)? {
691 EngineCallResponse::ValueMap(map) => Ok(map),
692 EngineCallResponse::Error(err) => Err(err),
693 _ => Err(ShellError::PluginFailedToDecode {
694 msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
695 }),
696 }
697 }
698
699 pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
714 match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
715 EngineCallResponse::PipelineData(_) => Ok(()),
716 EngineCallResponse::Error(err) => Err(err),
717 _ => Err(ShellError::PluginFailedToDecode {
718 msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
719 }),
720 }
721 }
722
723 pub fn get_help(&self) -> Result<String, ShellError> {
738 match self.engine_call(EngineCall::GetHelp)? {
739 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
740 Ok(val)
741 }
742 _ => Err(ShellError::PluginFailedToDecode {
743 msg: "Received unexpected response type for EngineCall::GetHelp".into(),
744 }),
745 }
746 }
747
748 pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
756 match self.engine_call(EngineCall::EnterForeground)? {
757 EngineCallResponse::Error(error) => Err(error),
758 EngineCallResponse::PipelineData(PipelineData::Value(
759 Value::Int { val: pgrp, .. },
760 _,
761 )) => {
762 set_pgrp_from_enter_foreground(pgrp)?;
763 Ok(ForegroundGuard(Some(self.clone())))
764 }
765 EngineCallResponse::PipelineData(PipelineData::Empty) => {
766 Ok(ForegroundGuard(Some(self.clone())))
767 }
768 _ => Err(ShellError::PluginFailedToDecode {
769 msg: "Received unexpected response type for EngineCall::SetForeground".into(),
770 }),
771 }
772 }
773
774 fn leave_foreground(&self) -> Result<(), ShellError> {
776 match self.engine_call(EngineCall::LeaveForeground)? {
777 EngineCallResponse::Error(error) => Err(error),
778 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
779 _ => Err(ShellError::PluginFailedToDecode {
780 msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
781 }),
782 }
783 }
784
785 pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
791 match self.engine_call(EngineCall::GetSpanContents(span))? {
792 EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
793 Ok(val)
794 }
795 _ => Err(ShellError::PluginFailedToDecode {
796 msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
797 }),
798 }
799 }
800
801 pub fn eval_closure_with_stream(
847 &self,
848 closure: &Spanned<Closure>,
849 mut positional: Vec<Value>,
850 input: PipelineData,
851 redirect_stdout: bool,
852 redirect_stderr: bool,
853 ) -> Result<PipelineData, ShellError> {
854 positional
856 .iter_mut()
857 .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
858
859 let call = EngineCall::EvalClosure {
860 closure: closure.clone(),
861 positional,
862 input,
863 redirect_stdout,
864 redirect_stderr,
865 };
866
867 match self.engine_call(call)? {
868 EngineCallResponse::Error(error) => Err(error),
869 EngineCallResponse::PipelineData(data) => Ok(data),
870 _ => Err(ShellError::PluginFailedToDecode {
871 msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
872 }),
873 }
874 }
875
876 pub fn eval_closure(
917 &self,
918 closure: &Spanned<Closure>,
919 positional: Vec<Value>,
920 input: Option<Value>,
921 ) -> Result<Value, ShellError> {
922 let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
923 let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
924 match output.into_value(closure.span)? {
926 Value::Error { error, .. } => Err(*error),
927 value => Ok(value),
928 }
929 }
930
931 pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
936 let call = EngineCall::FindDecl(name.into());
937
938 match self.engine_call(call)? {
939 EngineCallResponse::Error(err) => Err(err),
940 EngineCallResponse::Identifier(id) => Ok(Some(id)),
941 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
942 _ => Err(ShellError::PluginFailedToDecode {
943 msg: "Received unexpected response type for EngineCall::FindDecl".into(),
944 }),
945 }
946 }
947
948 pub fn get_block_ir(&self, block_id: BlockId) -> Result<IrBlock, ShellError> {
965 let call = EngineCall::GetBlockIR(block_id);
966
967 match self.engine_call(call)? {
968 EngineCallResponse::Error(err) => Err(err),
969 EngineCallResponse::IrBlock(ir) => Ok(*ir),
970 _ => Err(ShellError::PluginFailedToDecode {
971 msg: "Received unexpected response type for EngineCall::GetBlockIR".into(),
972 }),
973 }
974 }
975
976 pub fn call_decl(
1000 &self,
1001 decl_id: DeclId,
1002 call: EvaluatedCall,
1003 input: PipelineData,
1004 redirect_stdout: bool,
1005 redirect_stderr: bool,
1006 ) -> Result<PipelineData, ShellError> {
1007 let call = EngineCall::CallDecl {
1008 decl_id,
1009 call,
1010 input,
1011 redirect_stdout,
1012 redirect_stderr,
1013 };
1014
1015 match self.engine_call(call)? {
1016 EngineCallResponse::Error(err) => Err(err),
1017 EngineCallResponse::PipelineData(data) => Ok(data),
1018 _ => Err(ShellError::PluginFailedToDecode {
1019 msg: "Received unexpected response type for EngineCall::CallDecl".into(),
1020 }),
1021 }
1022 }
1023
1024 pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1032 self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1033 self.flush()
1034 }
1035
1036 pub(crate) fn write_ordering(
1038 &self,
1039 ordering: Option<impl Into<Ordering>>,
1040 ) -> Result<(), ShellError> {
1041 let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1042 self.write(PluginOutput::CallResponse(self.context()?, response))?;
1043 self.flush()
1044 }
1045
1046 pub fn signals(&self) -> &Signals {
1047 &self.state.signals
1048 }
1049}
1050
1051impl Interface for EngineInterface {
1052 type Output = PluginOutput;
1053 type DataContext = ();
1054
1055 fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1056 log::trace!("to engine: {output:?}");
1057 self.state.writer.write(&output)
1058 }
1059
1060 fn flush(&self) -> Result<(), ShellError> {
1061 self.state.writer.flush()
1062 }
1063
1064 fn stream_id_sequence(&self) -> &Sequence {
1065 &self.state.stream_id_sequence
1066 }
1067
1068 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1069 &self.stream_manager_handle
1070 }
1071
1072 fn prepare_pipeline_data(
1073 &self,
1074 mut data: PipelineData,
1075 _context: &(),
1076 ) -> Result<PipelineData, ShellError> {
1077 match data {
1079 PipelineData::Value(ref mut value, _) => {
1080 PluginCustomValue::serialize_custom_values_in(value)?;
1081 Ok(data)
1082 }
1083 PipelineData::ListStream(stream, meta) => {
1084 let stream = stream.map(|mut value| {
1085 let span = value.span();
1086 PluginCustomValue::serialize_custom_values_in(&mut value)
1087 .map(|_| value)
1088 .unwrap_or_else(|err| Value::error(err, span))
1089 });
1090 Ok(PipelineData::list_stream(stream, meta))
1091 }
1092 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1093 }
1094 }
1095}
1096
1097pub struct ForegroundGuard(Option<EngineInterface>);
1101
1102impl ForegroundGuard {
1103 fn leave_internal(&mut self) -> Result<(), ShellError> {
1105 if let Some(interface) = self.0.take() {
1106 #[cfg(unix)]
1108 {
1109 use nix::unistd::{Pid, setpgid};
1110 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1112 nu_protocol::shell_error::io::IoError::new_internal(
1113 std::io::Error::from(err),
1114 "Could not set pgid",
1115 nu_protocol::location!(),
1116 )
1117 })?;
1118 }
1119 interface.leave_foreground()?;
1120 }
1121 Ok(())
1122 }
1123
1124 pub fn leave(mut self) -> Result<(), ShellError> {
1126 let result = self.leave_internal();
1127 std::mem::forget(self);
1128 result
1129 }
1130}
1131
1132impl Drop for ForegroundGuard {
1133 fn drop(&mut self) {
1134 let _ = self.leave_internal();
1135 }
1136}
1137
1138#[cfg(unix)]
1139fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1140 use nix::unistd::{Pid, setpgid};
1141 if let Ok(pgrp) = pgrp.try_into() {
1142 setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1143 error: "Failed to set process group for foreground".into(),
1144 msg: "".into(),
1145 span: None,
1146 help: Some(err.to_string()),
1147 inner: vec![],
1148 })
1149 } else {
1150 Err(ShellError::NushellFailed {
1151 msg: "Engine returned an invalid process group ID".into(),
1152 })
1153 }
1154}
1155
1156#[cfg(not(unix))]
1157fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1158 Err(ShellError::NushellFailed {
1159 msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1160 })
1161}