1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10 PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11 PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14 Config, DeclId, Handler, HandlerGuard, Handlers, LabeledError, PipelineData, PluginMetadata,
15 PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16 engine::{Closure, Sequence},
17};
18use nu_utils::SharedCow;
19use std::{
20 collections::{BTreeMap, HashMap, btree_map},
21 sync::{Arc, atomic::AtomicBool, mpsc},
22};
23
24#[derive(Debug)]
32#[doc(hidden)]
33pub enum ReceivedPluginCall {
34 Metadata {
35 engine: EngineInterface,
36 },
37 Signature {
38 engine: EngineInterface,
39 },
40 Run {
41 engine: EngineInterface,
42 call: CallInfo<PipelineData>,
43 },
44 CustomValueOp {
45 engine: EngineInterface,
46 custom_value: Spanned<PluginCustomValue>,
47 op: CustomValueOp,
48 },
49}
50
51#[cfg(test)]
52mod tests;
53
54struct EngineInterfaceState {
56 protocol_info: Waitable<Arc<ProtocolInfo>>,
58 engine_call_id_sequence: Sequence,
60 stream_id_sequence: Sequence,
62 engine_call_subscription_sender:
64 mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
65 writer: Box<dyn PluginWrite<PluginOutput>>,
67 signals: Signals,
70 signal_handlers: Handlers,
72}
73
74impl std::fmt::Debug for EngineInterfaceState {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("EngineInterfaceState")
77 .field("protocol_info", &self.protocol_info)
78 .field("engine_call_id_sequence", &self.engine_call_id_sequence)
79 .field("stream_id_sequence", &self.stream_id_sequence)
80 .field(
81 "engine_call_subscription_sender",
82 &self.engine_call_subscription_sender,
83 )
84 .finish_non_exhaustive()
85 }
86}
87
88#[derive(Debug)]
92#[doc(hidden)]
93pub struct EngineInterfaceManager {
94 state: Arc<EngineInterfaceState>,
96 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
98 plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
100 plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
102 engine_call_subscriptions:
104 BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
105 engine_call_subscription_receiver:
107 mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
108 stream_manager: StreamManager,
110}
111
112impl EngineInterfaceManager {
113 pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
114 let (plug_tx, plug_rx) = mpsc::channel();
115 let (subscription_tx, subscription_rx) = mpsc::channel();
116 let protocol_info_mut = WaitableMut::new();
117
118 EngineInterfaceManager {
119 state: Arc::new(EngineInterfaceState {
120 protocol_info: protocol_info_mut.reader(),
121 engine_call_id_sequence: Sequence::default(),
122 stream_id_sequence: Sequence::default(),
123 engine_call_subscription_sender: subscription_tx,
124 writer: Box::new(writer),
125 signals: Signals::new(Arc::new(AtomicBool::new(false))),
126 signal_handlers: Handlers::new(),
127 }),
128 protocol_info_mut,
129 plugin_call_sender: Some(plug_tx),
130 plugin_call_receiver: Some(plug_rx),
131 engine_call_subscriptions: BTreeMap::new(),
132 engine_call_subscription_receiver: subscription_rx,
133 stream_manager: StreamManager::new(),
134 }
135 }
136
137 pub(crate) fn take_plugin_call_receiver(
140 &mut self,
141 ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
142 self.plugin_call_receiver.take()
143 }
144
145 fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
147 EngineInterface {
148 state: self.state.clone(),
149 stream_manager_handle: self.stream_manager.get_handle(),
150 context: Some(context),
151 }
152 }
153
154 fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
156 self.plugin_call_sender
157 .as_ref()
158 .ok_or_else(|| ShellError::PluginFailedToDecode {
159 msg: "Received a plugin call after Goodbye".into(),
160 })?
161 .send(plugin_call)
162 .map_err(|_| ShellError::NushellFailed {
163 msg: "Received a plugin call, but there's nowhere to send it".into(),
164 })
165 }
166
167 fn receive_engine_call_subscriptions(&mut self) {
169 for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
170 if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
171 e.insert(subscription);
172 } else {
173 log::warn!("Duplicate engine call ID ignored: {id}")
174 }
175 }
176 }
177
178 fn send_engine_call_response(
180 &mut self,
181 id: EngineCallId,
182 response: EngineCallResponse<PipelineData>,
183 ) -> Result<(), ShellError> {
184 self.receive_engine_call_subscriptions();
186 if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
188 if sender.send(response).is_err() {
189 log::warn!("Received an engine call response for id={id}, but the caller hung up");
190 }
191 Ok(())
192 } else {
193 Err(ShellError::PluginFailedToDecode {
194 msg: format!("Unknown engine call ID: {id}"),
195 })
196 }
197 }
198
199 pub(crate) fn is_finished(&self) -> bool {
202 Arc::strong_count(&self.state) < 2
203 }
204
205 pub(crate) fn consume_all(
209 &mut self,
210 mut reader: impl PluginRead<PluginInput>,
211 ) -> Result<(), ShellError> {
212 while let Some(msg) = reader.read().transpose() {
213 if self.is_finished() {
214 break;
215 }
216
217 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
218 let _ = self.stream_manager.broadcast_read_error(err.clone());
220 self.receive_engine_call_subscriptions();
222 for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
223 let _ = sender.send(EngineCallResponse::Error(err.clone()));
224 }
225 return Err(err);
226 }
227 }
228 Ok(())
229 }
230}
231
232impl InterfaceManager for EngineInterfaceManager {
233 type Interface = EngineInterface;
234 type Input = PluginInput;
235
236 fn get_interface(&self) -> Self::Interface {
237 EngineInterface {
238 state: self.state.clone(),
239 stream_manager_handle: self.stream_manager.get_handle(),
240 context: None,
241 }
242 }
243
244 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
245 log::trace!("from engine: {input:?}");
246 match input {
247 PluginInput::Hello(info) => {
248 let info = Arc::new(info);
249 self.protocol_info_mut.set(info.clone())?;
250
251 let local_info = ProtocolInfo::default();
252 if local_info.is_compatible_with(&info)? {
253 Ok(())
254 } else {
255 Err(ShellError::PluginFailedToLoad {
256 msg: format!(
257 "Plugin is compiled for nushell version {}, \
258 which is not compatible with version {}",
259 local_info.version, info.version
260 ),
261 })
262 }
263 }
264 _ if !self.state.protocol_info.is_set() => {
265 Err(ShellError::PluginFailedToLoad {
267 msg: "Failed to receive initial Hello message. This engine might be too old"
268 .into(),
269 })
270 }
271 PluginInput::Data(..)
273 | PluginInput::End(..)
274 | PluginInput::Drop(..)
275 | PluginInput::Ack(..) => {
276 self.consume_stream_message(input.try_into().map_err(|msg| {
277 ShellError::NushellFailed {
278 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
279 }
280 })?)
281 }
282 PluginInput::Call(id, call) => {
283 let interface = self.interface_for_context(id);
284 let call = match call
286 .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
287 {
288 Ok(call) => call,
289 Err(err) => {
290 return interface.write_response(Err(err))?.write();
293 }
294 };
295 match call {
296 PluginCall::Metadata => {
298 self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
299 }
300 PluginCall::Signature => {
302 self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
303 }
304 PluginCall::Run(mut call_info) => {
306 if let Err(err) = deserialize_call_args(&mut call_info.call) {
308 return interface.write_response(Err(err))?.write();
309 }
310 self.send_plugin_call(ReceivedPluginCall::Run {
312 engine: interface,
313 call: call_info,
314 })
315 }
316 PluginCall::CustomValueOp(custom_value, op) => {
318 self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
319 engine: interface,
320 custom_value,
321 op,
322 })
323 }
324 }
325 }
326 PluginInput::Goodbye => {
327 drop(self.plugin_call_sender.take());
329 Ok(())
330 }
331 PluginInput::EngineCallResponse(id, response) => {
332 let response = response
333 .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
334 .unwrap_or_else(|err| {
335 EngineCallResponse::Error(err)
338 });
339 self.send_engine_call_response(id, response)
340 }
341 PluginInput::Signal(action) => {
342 match action {
343 SignalAction::Interrupt => self.state.signals.trigger(),
344 SignalAction::Reset => self.state.signals.reset(),
345 }
346 self.state.signal_handlers.run(action);
347 Ok(())
348 }
349 }
350 }
351
352 fn stream_manager(&self) -> &StreamManager {
353 &self.stream_manager
354 }
355
356 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
357 match data {
359 PipelineData::Value(ref mut value, _) => {
360 PluginCustomValue::deserialize_custom_values_in(value)?;
361 Ok(data)
362 }
363 PipelineData::ListStream(stream, meta) => {
364 let stream = stream.map(|mut value| {
365 let span = value.span();
366 PluginCustomValue::deserialize_custom_values_in(&mut value)
367 .map(|()| value)
368 .unwrap_or_else(|err| Value::error(err, span))
369 });
370 Ok(PipelineData::list_stream(stream, meta))
371 }
372 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
373 }
374 }
375}
376
377fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
379 call.positional
380 .iter_mut()
381 .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
382 call.named
383 .iter_mut()
384 .flat_map(|(_, value)| value.as_mut())
385 .try_for_each(PluginCustomValue::deserialize_custom_values_in)
386}
387
388#[derive(Debug, Clone)]
390pub struct EngineInterface {
391 state: Arc<EngineInterfaceState>,
393 stream_manager_handle: StreamManagerHandle,
395 context: Option<PluginCallId>,
397}
398
399impl EngineInterface {
400 pub(crate) fn hello(&self) -> Result<(), ShellError> {
402 self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
403 self.flush()
404 }
405
406 fn context(&self) -> Result<PluginCallId, ShellError> {
407 self.context.ok_or_else(|| ShellError::NushellFailed {
408 msg: "Tried to call an EngineInterface method that requires a call context \
409 outside of one"
410 .into(),
411 })
412 }
413
414 pub(crate) fn write_ok(
416 &self,
417 result: Result<(), impl Into<LabeledError>>,
418 ) -> Result<(), ShellError> {
419 let response = match result {
420 Ok(()) => PluginCallResponse::Ok,
421 Err(err) => PluginCallResponse::Error(err.into()),
422 };
423 self.write(PluginOutput::CallResponse(self.context()?, response))?;
424 self.flush()
425 }
426
427 pub(crate) fn write_response(
430 &self,
431 result: Result<PipelineData, impl Into<LabeledError>>,
432 ) -> Result<PipelineDataWriter<Self>, ShellError> {
433 match result {
434 Ok(data) => {
435 let (header, writer) = match self.init_write_pipeline_data(data, &()) {
436 Ok(tup) => tup,
437 Err(err) => return self.write_response(Err(err)),
440 };
441 let response = PluginCallResponse::PipelineData(header);
443 self.write(PluginOutput::CallResponse(self.context()?, response))?;
444 self.flush()?;
445 Ok(writer)
446 }
447 Err(err) => {
448 let response = PluginCallResponse::Error(err.into());
449 self.write(PluginOutput::CallResponse(self.context()?, response))?;
450 self.flush()?;
451 Ok(Default::default())
452 }
453 }
454 }
455
456 pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
458 let response = PluginCallResponse::Metadata(metadata);
459 self.write(PluginOutput::CallResponse(self.context()?, response))?;
460 self.flush()
461 }
462
463 pub(crate) fn write_signature(
467 &self,
468 signature: Vec<PluginSignature>,
469 ) -> Result<(), ShellError> {
470 let response = PluginCallResponse::Signature(signature);
471 self.write(PluginOutput::CallResponse(self.context()?, response))?;
472 self.flush()
473 }
474
475 fn write_engine_call(
478 &self,
479 call: EngineCall<PipelineData>,
480 ) -> Result<
481 (
482 PipelineDataWriter<Self>,
483 mpsc::Receiver<EngineCallResponse<PipelineData>>,
484 ),
485 ShellError,
486 > {
487 let context = self.context()?;
488 let id = self.state.engine_call_id_sequence.next()?;
489 let (tx, rx) = mpsc::channel();
490
491 let mut writer = None;
493
494 let call = call.map_data(|input| {
495 let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
496 writer = Some(input_writer);
497 Ok(input_header)
498 })?;
499
500 self.state
502 .engine_call_subscription_sender
503 .send((id, tx))
504 .map_err(|_| ShellError::NushellFailed {
505 msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
506 .into(),
507 })?;
508
509 self.write(PluginOutput::EngineCall { context, id, call })?;
511 self.flush()?;
512
513 Ok((writer.unwrap_or_default(), rx))
514 }
515
516 fn engine_call(
518 &self,
519 call: EngineCall<PipelineData>,
520 ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
521 let (writer, rx) = self.write_engine_call(call)?;
522
523 writer.write_background()?;
525
526 rx.recv().map_err(|_| ShellError::NushellFailed {
528 msg: "Failed to get response to engine call because the channel was closed".into(),
529 })
530 }
531
532 pub fn is_using_stdio(&self) -> bool {
538 self.state.writer.is_stdout()
539 }
540
541 pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
544 self.state.signal_handlers.register(handler)
545 }
546
547 pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
564 match self.engine_call(EngineCall::GetConfig)? {
565 EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
566 EngineCallResponse::Error(err) => Err(err),
567 _ => Err(ShellError::PluginFailedToDecode {
568 msg: "Received unexpected response for EngineCall::GetConfig".into(),
569 }),
570 }
571 }
572
573 fn engine_call_option_value(
576 &self,
577 engine_call: EngineCall<PipelineData>,
578 ) -> Result<Option<Value>, ShellError> {
579 let name = engine_call.name();
580 match self.engine_call(engine_call)? {
581 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
582 EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
583 EngineCallResponse::Error(err) => Err(err),
584 _ => Err(ShellError::PluginFailedToDecode {
585 msg: format!("Received unexpected response for EngineCall::{name}"),
586 }),
587 }
588 }
589
590 pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
608 self.engine_call_option_value(EngineCall::GetPluginConfig)
609 }
610
611 pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
627 self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
628 }
629
630 pub fn get_current_dir(&self) -> Result<String, ShellError> {
641 match self.engine_call(EngineCall::GetCurrentDir)? {
642 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
644 Ok(val)
645 }
646 EngineCallResponse::Error(err) => Err(err),
647 _ => Err(ShellError::PluginFailedToDecode {
648 msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
649 }),
650 }
651 }
652
653 pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
669 match self.engine_call(EngineCall::GetEnvVars)? {
670 EngineCallResponse::ValueMap(map) => Ok(map),
671 EngineCallResponse::Error(err) => Err(err),
672 _ => Err(ShellError::PluginFailedToDecode {
673 msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
674 }),
675 }
676 }
677
678 pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
693 match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
694 EngineCallResponse::PipelineData(_) => Ok(()),
695 EngineCallResponse::Error(err) => Err(err),
696 _ => Err(ShellError::PluginFailedToDecode {
697 msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
698 }),
699 }
700 }
701
702 pub fn get_help(&self) -> Result<String, ShellError> {
717 match self.engine_call(EngineCall::GetHelp)? {
718 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
719 Ok(val)
720 }
721 _ => Err(ShellError::PluginFailedToDecode {
722 msg: "Received unexpected response type for EngineCall::GetHelp".into(),
723 }),
724 }
725 }
726
727 pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
735 match self.engine_call(EngineCall::EnterForeground)? {
736 EngineCallResponse::Error(error) => Err(error),
737 EngineCallResponse::PipelineData(PipelineData::Value(
738 Value::Int { val: pgrp, .. },
739 _,
740 )) => {
741 set_pgrp_from_enter_foreground(pgrp)?;
742 Ok(ForegroundGuard(Some(self.clone())))
743 }
744 EngineCallResponse::PipelineData(PipelineData::Empty) => {
745 Ok(ForegroundGuard(Some(self.clone())))
746 }
747 _ => Err(ShellError::PluginFailedToDecode {
748 msg: "Received unexpected response type for EngineCall::SetForeground".into(),
749 }),
750 }
751 }
752
753 fn leave_foreground(&self) -> Result<(), ShellError> {
755 match self.engine_call(EngineCall::LeaveForeground)? {
756 EngineCallResponse::Error(error) => Err(error),
757 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
758 _ => Err(ShellError::PluginFailedToDecode {
759 msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
760 }),
761 }
762 }
763
764 pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
770 match self.engine_call(EngineCall::GetSpanContents(span))? {
771 EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
772 Ok(val)
773 }
774 _ => Err(ShellError::PluginFailedToDecode {
775 msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
776 }),
777 }
778 }
779
780 pub fn eval_closure_with_stream(
826 &self,
827 closure: &Spanned<Closure>,
828 mut positional: Vec<Value>,
829 input: PipelineData,
830 redirect_stdout: bool,
831 redirect_stderr: bool,
832 ) -> Result<PipelineData, ShellError> {
833 positional
835 .iter_mut()
836 .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
837
838 let call = EngineCall::EvalClosure {
839 closure: closure.clone(),
840 positional,
841 input,
842 redirect_stdout,
843 redirect_stderr,
844 };
845
846 match self.engine_call(call)? {
847 EngineCallResponse::Error(error) => Err(error),
848 EngineCallResponse::PipelineData(data) => Ok(data),
849 _ => Err(ShellError::PluginFailedToDecode {
850 msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
851 }),
852 }
853 }
854
855 pub fn eval_closure(
896 &self,
897 closure: &Spanned<Closure>,
898 positional: Vec<Value>,
899 input: Option<Value>,
900 ) -> Result<Value, ShellError> {
901 let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
902 let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
903 match output.into_value(closure.span)? {
905 Value::Error { error, .. } => Err(*error),
906 value => Ok(value),
907 }
908 }
909
910 pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
915 let call = EngineCall::FindDecl(name.into());
916
917 match self.engine_call(call)? {
918 EngineCallResponse::Error(err) => Err(err),
919 EngineCallResponse::Identifier(id) => Ok(Some(id)),
920 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
921 _ => Err(ShellError::PluginFailedToDecode {
922 msg: "Received unexpected response type for EngineCall::FindDecl".into(),
923 }),
924 }
925 }
926
927 pub fn call_decl(
951 &self,
952 decl_id: DeclId,
953 call: EvaluatedCall,
954 input: PipelineData,
955 redirect_stdout: bool,
956 redirect_stderr: bool,
957 ) -> Result<PipelineData, ShellError> {
958 let call = EngineCall::CallDecl {
959 decl_id,
960 call,
961 input,
962 redirect_stdout,
963 redirect_stderr,
964 };
965
966 match self.engine_call(call)? {
967 EngineCallResponse::Error(err) => Err(err),
968 EngineCallResponse::PipelineData(data) => Ok(data),
969 _ => Err(ShellError::PluginFailedToDecode {
970 msg: "Received unexpected response type for EngineCall::CallDecl".into(),
971 }),
972 }
973 }
974
975 pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
983 self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
984 self.flush()
985 }
986
987 pub(crate) fn write_ordering(
989 &self,
990 ordering: Option<impl Into<Ordering>>,
991 ) -> Result<(), ShellError> {
992 let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
993 self.write(PluginOutput::CallResponse(self.context()?, response))?;
994 self.flush()
995 }
996
997 pub fn signals(&self) -> &Signals {
998 &self.state.signals
999 }
1000}
1001
1002impl Interface for EngineInterface {
1003 type Output = PluginOutput;
1004 type DataContext = ();
1005
1006 fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1007 log::trace!("to engine: {output:?}");
1008 self.state.writer.write(&output)
1009 }
1010
1011 fn flush(&self) -> Result<(), ShellError> {
1012 self.state.writer.flush()
1013 }
1014
1015 fn stream_id_sequence(&self) -> &Sequence {
1016 &self.state.stream_id_sequence
1017 }
1018
1019 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1020 &self.stream_manager_handle
1021 }
1022
1023 fn prepare_pipeline_data(
1024 &self,
1025 mut data: PipelineData,
1026 _context: &(),
1027 ) -> Result<PipelineData, ShellError> {
1028 match data {
1030 PipelineData::Value(ref mut value, _) => {
1031 PluginCustomValue::serialize_custom_values_in(value)?;
1032 Ok(data)
1033 }
1034 PipelineData::ListStream(stream, meta) => {
1035 let stream = stream.map(|mut value| {
1036 let span = value.span();
1037 PluginCustomValue::serialize_custom_values_in(&mut value)
1038 .map(|_| value)
1039 .unwrap_or_else(|err| Value::error(err, span))
1040 });
1041 Ok(PipelineData::list_stream(stream, meta))
1042 }
1043 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1044 }
1045 }
1046}
1047
1048pub struct ForegroundGuard(Option<EngineInterface>);
1052
1053impl ForegroundGuard {
1054 fn leave_internal(&mut self) -> Result<(), ShellError> {
1056 if let Some(interface) = self.0.take() {
1057 #[cfg(unix)]
1059 {
1060 use nix::unistd::{Pid, setpgid};
1061 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1063 nu_protocol::shell_error::io::IoError::new_internal(
1064 std::io::Error::from(err),
1065 "Could not set pgid",
1066 nu_protocol::location!(),
1067 )
1068 })?;
1069 }
1070 interface.leave_foreground()?;
1071 }
1072 Ok(())
1073 }
1074
1075 pub fn leave(mut self) -> Result<(), ShellError> {
1077 let result = self.leave_internal();
1078 std::mem::forget(self);
1079 result
1080 }
1081}
1082
1083impl Drop for ForegroundGuard {
1084 fn drop(&mut self) {
1085 let _ = self.leave_internal();
1086 }
1087}
1088
1089#[cfg(unix)]
1090fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1091 use nix::unistd::{Pid, setpgid};
1092 if let Ok(pgrp) = pgrp.try_into() {
1093 setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1094 error: "Failed to set process group for foreground".into(),
1095 msg: "".into(),
1096 span: None,
1097 help: Some(err.to_string()),
1098 inner: vec![],
1099 })
1100 } else {
1101 Err(ShellError::NushellFailed {
1102 msg: "Engine returned an invalid process group ID".into(),
1103 })
1104 }
1105}
1106
1107#[cfg(not(unix))]
1108fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1109 Err(ShellError::NushellFailed {
1110 msg: concat!(
1111 "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.",
1112 )
1113 .into(),
1114 })
1115}