1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall, Ordering,
10 PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
11 PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14 Config, DeclId, Handler, HandlerGuard, Handlers, LabeledError, PipelineData, PluginMetadata,
15 PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16 engine::{Closure, Sequence},
17};
18use nu_utils::SharedCow;
19use std::{
20 collections::{BTreeMap, HashMap, btree_map},
21 sync::{Arc, atomic::AtomicBool, mpsc},
22};
23
24#[derive(Debug)]
32#[doc(hidden)]
33pub enum ReceivedPluginCall {
34 Metadata {
35 engine: EngineInterface,
36 },
37 Signature {
38 engine: EngineInterface,
39 },
40 Run {
41 engine: EngineInterface,
42 call: CallInfo<PipelineData>,
43 },
44 CustomValueOp {
45 engine: EngineInterface,
46 custom_value: Spanned<PluginCustomValue>,
47 op: CustomValueOp,
48 },
49}
50
51#[cfg(test)]
52mod tests;
53
54struct EngineInterfaceState {
56 protocol_info: Waitable<Arc<ProtocolInfo>>,
58 engine_call_id_sequence: Sequence,
60 stream_id_sequence: Sequence,
62 engine_call_subscription_sender:
64 mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
65 writer: Box<dyn PluginWrite<PluginOutput>>,
67 signals: Signals,
70 signal_handlers: Handlers,
72}
73
74impl std::fmt::Debug for EngineInterfaceState {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("EngineInterfaceState")
77 .field("protocol_info", &self.protocol_info)
78 .field("engine_call_id_sequence", &self.engine_call_id_sequence)
79 .field("stream_id_sequence", &self.stream_id_sequence)
80 .field(
81 "engine_call_subscription_sender",
82 &self.engine_call_subscription_sender,
83 )
84 .finish_non_exhaustive()
85 }
86}
87
88#[derive(Debug)]
92#[doc(hidden)]
93pub struct EngineInterfaceManager {
94 state: Arc<EngineInterfaceState>,
96 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
98 plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
100 plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
102 engine_call_subscriptions:
104 BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
105 engine_call_subscription_receiver:
107 mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
108 stream_manager: StreamManager,
110}
111
112impl EngineInterfaceManager {
113 pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
114 let (plug_tx, plug_rx) = mpsc::channel();
115 let (subscription_tx, subscription_rx) = mpsc::channel();
116 let protocol_info_mut = WaitableMut::new();
117
118 EngineInterfaceManager {
119 state: Arc::new(EngineInterfaceState {
120 protocol_info: protocol_info_mut.reader(),
121 engine_call_id_sequence: Sequence::default(),
122 stream_id_sequence: Sequence::default(),
123 engine_call_subscription_sender: subscription_tx,
124 writer: Box::new(writer),
125 signals: Signals::new(Arc::new(AtomicBool::new(false))),
126 signal_handlers: Handlers::new(),
127 }),
128 protocol_info_mut,
129 plugin_call_sender: Some(plug_tx),
130 plugin_call_receiver: Some(plug_rx),
131 engine_call_subscriptions: BTreeMap::new(),
132 engine_call_subscription_receiver: subscription_rx,
133 stream_manager: StreamManager::new(),
134 }
135 }
136
137 pub(crate) fn take_plugin_call_receiver(
140 &mut self,
141 ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
142 self.plugin_call_receiver.take()
143 }
144
145 fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
147 EngineInterface {
148 state: self.state.clone(),
149 stream_manager_handle: self.stream_manager.get_handle(),
150 context: Some(context),
151 }
152 }
153
154 fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
156 self.plugin_call_sender
157 .as_ref()
158 .ok_or_else(|| ShellError::PluginFailedToDecode {
159 msg: "Received a plugin call after Goodbye".into(),
160 })?
161 .send(plugin_call)
162 .map_err(|_| ShellError::NushellFailed {
163 msg: "Received a plugin call, but there's nowhere to send it".into(),
164 })
165 }
166
167 fn receive_engine_call_subscriptions(&mut self) {
169 for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
170 if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
171 e.insert(subscription);
172 } else {
173 log::warn!("Duplicate engine call ID ignored: {id}")
174 }
175 }
176 }
177
178 fn send_engine_call_response(
180 &mut self,
181 id: EngineCallId,
182 response: EngineCallResponse<PipelineData>,
183 ) -> Result<(), ShellError> {
184 self.receive_engine_call_subscriptions();
186 if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
188 if sender.send(response).is_err() {
189 log::warn!("Received an engine call response for id={id}, but the caller hung up");
190 }
191 Ok(())
192 } else {
193 Err(ShellError::PluginFailedToDecode {
194 msg: format!("Unknown engine call ID: {id}"),
195 })
196 }
197 }
198
199 pub(crate) fn is_finished(&self) -> bool {
202 Arc::strong_count(&self.state) < 2
203 }
204
205 pub(crate) fn consume_all(
209 &mut self,
210 mut reader: impl PluginRead<PluginInput>,
211 ) -> Result<(), ShellError> {
212 while let Some(msg) = reader.read().transpose() {
213 if self.is_finished() {
214 break;
215 }
216
217 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
218 let _ = self.stream_manager.broadcast_read_error(err.clone());
220 self.receive_engine_call_subscriptions();
222 for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
223 let _ = sender.send(EngineCallResponse::Error(err.clone()));
224 }
225 return Err(err);
226 }
227 }
228 Ok(())
229 }
230}
231
232impl InterfaceManager for EngineInterfaceManager {
233 type Interface = EngineInterface;
234 type Input = PluginInput;
235
236 fn get_interface(&self) -> Self::Interface {
237 EngineInterface {
238 state: self.state.clone(),
239 stream_manager_handle: self.stream_manager.get_handle(),
240 context: None,
241 }
242 }
243
244 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
245 log::trace!("from engine: {input:?}");
246 match input {
247 PluginInput::Hello(info) => {
248 let info = Arc::new(info);
249 self.protocol_info_mut.set(info.clone())?;
250
251 let local_info = ProtocolInfo::default();
252 if local_info.is_compatible_with(&info)? {
253 Ok(())
254 } else {
255 Err(ShellError::PluginFailedToLoad {
256 msg: format!(
257 "Plugin is compiled for nushell version {}, \
258 which is not compatible with version {}",
259 local_info.version, info.version
260 ),
261 })
262 }
263 }
264 _ if !self.state.protocol_info.is_set() => {
265 Err(ShellError::PluginFailedToLoad {
267 msg: "Failed to receive initial Hello message. This engine might be too old"
268 .into(),
269 })
270 }
271 PluginInput::Data(..)
273 | PluginInput::End(..)
274 | PluginInput::Drop(..)
275 | PluginInput::Ack(..) => {
276 self.consume_stream_message(input.try_into().map_err(|msg| {
277 ShellError::NushellFailed {
278 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
279 }
280 })?)
281 }
282 PluginInput::Call(id, call) => {
283 let interface = self.interface_for_context(id);
284 let call = match call
286 .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
287 {
288 Ok(call) => call,
289 Err(err) => {
290 return interface.write_response(Err(err))?.write();
293 }
294 };
295 match call {
296 PluginCall::Metadata => {
298 self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
299 }
300 PluginCall::Signature => {
302 self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
303 }
304 PluginCall::Run(mut call_info) => {
306 if let Err(err) = deserialize_call_args(&mut call_info.call) {
308 return interface.write_response(Err(err))?.write();
309 }
310 self.send_plugin_call(ReceivedPluginCall::Run {
312 engine: interface,
313 call: call_info,
314 })
315 }
316 PluginCall::CustomValueOp(custom_value, op) => {
318 self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
319 engine: interface,
320 custom_value,
321 op,
322 })
323 }
324 }
325 }
326 PluginInput::Goodbye => {
327 drop(self.plugin_call_sender.take());
329 Ok(())
330 }
331 PluginInput::EngineCallResponse(id, response) => {
332 let response = response
333 .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
334 .unwrap_or_else(|err| {
335 EngineCallResponse::Error(err)
338 });
339 self.send_engine_call_response(id, response)
340 }
341 PluginInput::Signal(action) => {
342 match action {
343 SignalAction::Interrupt => self.state.signals.trigger(),
344 SignalAction::Reset => self.state.signals.reset(),
345 }
346 self.state.signal_handlers.run(action);
347 Ok(())
348 }
349 }
350 }
351
352 fn stream_manager(&self) -> &StreamManager {
353 &self.stream_manager
354 }
355
356 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
357 match data {
359 PipelineData::Value(ref mut value, _) => {
360 PluginCustomValue::deserialize_custom_values_in(value)?;
361 Ok(data)
362 }
363 PipelineData::ListStream(stream, meta) => {
364 let stream = stream.map(|mut value| {
365 let span = value.span();
366 PluginCustomValue::deserialize_custom_values_in(&mut value)
367 .map(|()| value)
368 .unwrap_or_else(|err| Value::error(err, span))
369 });
370 Ok(PipelineData::list_stream(stream, meta))
371 }
372 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
373 }
374 }
375}
376
377fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
379 call.positional
380 .iter_mut()
381 .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
382 call.named
383 .iter_mut()
384 .flat_map(|(_, value)| value.as_mut())
385 .try_for_each(PluginCustomValue::deserialize_custom_values_in)
386}
387
388#[derive(Debug, Clone)]
390pub struct EngineInterface {
391 state: Arc<EngineInterfaceState>,
393 stream_manager_handle: StreamManagerHandle,
395 context: Option<PluginCallId>,
397}
398
399impl EngineInterface {
400 pub(crate) fn hello(&self) -> Result<(), ShellError> {
402 self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
403 self.flush()
404 }
405
406 fn context(&self) -> Result<PluginCallId, ShellError> {
407 self.context.ok_or_else(|| ShellError::NushellFailed {
408 msg: "Tried to call an EngineInterface method that requires a call context \
409 outside of one"
410 .into(),
411 })
412 }
413
414 pub(crate) fn write_response(
417 &self,
418 result: Result<PipelineData, impl Into<LabeledError>>,
419 ) -> Result<PipelineDataWriter<Self>, ShellError> {
420 match result {
421 Ok(data) => {
422 let (header, writer) = match self.init_write_pipeline_data(data, &()) {
423 Ok(tup) => tup,
424 Err(err) => return self.write_response(Err(err)),
427 };
428 let response = PluginCallResponse::PipelineData(header);
430 self.write(PluginOutput::CallResponse(self.context()?, response))?;
431 self.flush()?;
432 Ok(writer)
433 }
434 Err(err) => {
435 let response = PluginCallResponse::Error(err.into());
436 self.write(PluginOutput::CallResponse(self.context()?, response))?;
437 self.flush()?;
438 Ok(Default::default())
439 }
440 }
441 }
442
443 pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
445 let response = PluginCallResponse::Metadata(metadata);
446 self.write(PluginOutput::CallResponse(self.context()?, response))?;
447 self.flush()
448 }
449
450 pub(crate) fn write_signature(
454 &self,
455 signature: Vec<PluginSignature>,
456 ) -> Result<(), ShellError> {
457 let response = PluginCallResponse::Signature(signature);
458 self.write(PluginOutput::CallResponse(self.context()?, response))?;
459 self.flush()
460 }
461
462 fn write_engine_call(
465 &self,
466 call: EngineCall<PipelineData>,
467 ) -> Result<
468 (
469 PipelineDataWriter<Self>,
470 mpsc::Receiver<EngineCallResponse<PipelineData>>,
471 ),
472 ShellError,
473 > {
474 let context = self.context()?;
475 let id = self.state.engine_call_id_sequence.next()?;
476 let (tx, rx) = mpsc::channel();
477
478 let mut writer = None;
480
481 let call = call.map_data(|input| {
482 let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
483 writer = Some(input_writer);
484 Ok(input_header)
485 })?;
486
487 self.state
489 .engine_call_subscription_sender
490 .send((id, tx))
491 .map_err(|_| ShellError::NushellFailed {
492 msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
493 .into(),
494 })?;
495
496 self.write(PluginOutput::EngineCall { context, id, call })?;
498 self.flush()?;
499
500 Ok((writer.unwrap_or_default(), rx))
501 }
502
503 fn engine_call(
505 &self,
506 call: EngineCall<PipelineData>,
507 ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
508 let (writer, rx) = self.write_engine_call(call)?;
509
510 writer.write_background()?;
512
513 rx.recv().map_err(|_| ShellError::NushellFailed {
515 msg: "Failed to get response to engine call because the channel was closed".into(),
516 })
517 }
518
519 pub fn is_using_stdio(&self) -> bool {
525 self.state.writer.is_stdout()
526 }
527
528 pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
531 self.state.signal_handlers.register(handler)
532 }
533
534 pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
551 match self.engine_call(EngineCall::GetConfig)? {
552 EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
553 EngineCallResponse::Error(err) => Err(err),
554 _ => Err(ShellError::PluginFailedToDecode {
555 msg: "Received unexpected response for EngineCall::GetConfig".into(),
556 }),
557 }
558 }
559
560 fn engine_call_option_value(
563 &self,
564 engine_call: EngineCall<PipelineData>,
565 ) -> Result<Option<Value>, ShellError> {
566 let name = engine_call.name();
567 match self.engine_call(engine_call)? {
568 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
569 EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
570 EngineCallResponse::Error(err) => Err(err),
571 _ => Err(ShellError::PluginFailedToDecode {
572 msg: format!("Received unexpected response for EngineCall::{name}"),
573 }),
574 }
575 }
576
577 pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
595 self.engine_call_option_value(EngineCall::GetPluginConfig)
596 }
597
598 pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
614 self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
615 }
616
617 pub fn get_current_dir(&self) -> Result<String, ShellError> {
628 match self.engine_call(EngineCall::GetCurrentDir)? {
629 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
631 Ok(val)
632 }
633 EngineCallResponse::Error(err) => Err(err),
634 _ => Err(ShellError::PluginFailedToDecode {
635 msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
636 }),
637 }
638 }
639
640 pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
656 match self.engine_call(EngineCall::GetEnvVars)? {
657 EngineCallResponse::ValueMap(map) => Ok(map),
658 EngineCallResponse::Error(err) => Err(err),
659 _ => Err(ShellError::PluginFailedToDecode {
660 msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
661 }),
662 }
663 }
664
665 pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
680 match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
681 EngineCallResponse::PipelineData(_) => Ok(()),
682 EngineCallResponse::Error(err) => Err(err),
683 _ => Err(ShellError::PluginFailedToDecode {
684 msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
685 }),
686 }
687 }
688
689 pub fn get_help(&self) -> Result<String, ShellError> {
704 match self.engine_call(EngineCall::GetHelp)? {
705 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
706 Ok(val)
707 }
708 _ => Err(ShellError::PluginFailedToDecode {
709 msg: "Received unexpected response type for EngineCall::GetHelp".into(),
710 }),
711 }
712 }
713
714 pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
722 match self.engine_call(EngineCall::EnterForeground)? {
723 EngineCallResponse::Error(error) => Err(error),
724 EngineCallResponse::PipelineData(PipelineData::Value(
725 Value::Int { val: pgrp, .. },
726 _,
727 )) => {
728 set_pgrp_from_enter_foreground(pgrp)?;
729 Ok(ForegroundGuard(Some(self.clone())))
730 }
731 EngineCallResponse::PipelineData(PipelineData::Empty) => {
732 Ok(ForegroundGuard(Some(self.clone())))
733 }
734 _ => Err(ShellError::PluginFailedToDecode {
735 msg: "Received unexpected response type for EngineCall::SetForeground".into(),
736 }),
737 }
738 }
739
740 fn leave_foreground(&self) -> Result<(), ShellError> {
742 match self.engine_call(EngineCall::LeaveForeground)? {
743 EngineCallResponse::Error(error) => Err(error),
744 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
745 _ => Err(ShellError::PluginFailedToDecode {
746 msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
747 }),
748 }
749 }
750
751 pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
757 match self.engine_call(EngineCall::GetSpanContents(span))? {
758 EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
759 Ok(val)
760 }
761 _ => Err(ShellError::PluginFailedToDecode {
762 msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
763 }),
764 }
765 }
766
767 pub fn eval_closure_with_stream(
813 &self,
814 closure: &Spanned<Closure>,
815 mut positional: Vec<Value>,
816 input: PipelineData,
817 redirect_stdout: bool,
818 redirect_stderr: bool,
819 ) -> Result<PipelineData, ShellError> {
820 positional
822 .iter_mut()
823 .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
824
825 let call = EngineCall::EvalClosure {
826 closure: closure.clone(),
827 positional,
828 input,
829 redirect_stdout,
830 redirect_stderr,
831 };
832
833 match self.engine_call(call)? {
834 EngineCallResponse::Error(error) => Err(error),
835 EngineCallResponse::PipelineData(data) => Ok(data),
836 _ => Err(ShellError::PluginFailedToDecode {
837 msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
838 }),
839 }
840 }
841
842 pub fn eval_closure(
883 &self,
884 closure: &Spanned<Closure>,
885 positional: Vec<Value>,
886 input: Option<Value>,
887 ) -> Result<Value, ShellError> {
888 let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
889 let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
890 match output.into_value(closure.span)? {
892 Value::Error { error, .. } => Err(*error),
893 value => Ok(value),
894 }
895 }
896
897 pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
902 let call = EngineCall::FindDecl(name.into());
903
904 match self.engine_call(call)? {
905 EngineCallResponse::Error(err) => Err(err),
906 EngineCallResponse::Identifier(id) => Ok(Some(id)),
907 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
908 _ => Err(ShellError::PluginFailedToDecode {
909 msg: "Received unexpected response type for EngineCall::FindDecl".into(),
910 }),
911 }
912 }
913
914 pub fn call_decl(
938 &self,
939 decl_id: DeclId,
940 call: EvaluatedCall,
941 input: PipelineData,
942 redirect_stdout: bool,
943 redirect_stderr: bool,
944 ) -> Result<PipelineData, ShellError> {
945 let call = EngineCall::CallDecl {
946 decl_id,
947 call,
948 input,
949 redirect_stdout,
950 redirect_stderr,
951 };
952
953 match self.engine_call(call)? {
954 EngineCallResponse::Error(err) => Err(err),
955 EngineCallResponse::PipelineData(data) => Ok(data),
956 _ => Err(ShellError::PluginFailedToDecode {
957 msg: "Received unexpected response type for EngineCall::CallDecl".into(),
958 }),
959 }
960 }
961
962 pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
970 self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
971 self.flush()
972 }
973
974 pub(crate) fn write_ordering(
976 &self,
977 ordering: Option<impl Into<Ordering>>,
978 ) -> Result<(), ShellError> {
979 let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
980 self.write(PluginOutput::CallResponse(self.context()?, response))?;
981 self.flush()
982 }
983
984 pub fn signals(&self) -> &Signals {
985 &self.state.signals
986 }
987}
988
989impl Interface for EngineInterface {
990 type Output = PluginOutput;
991 type DataContext = ();
992
993 fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
994 log::trace!("to engine: {output:?}");
995 self.state.writer.write(&output)
996 }
997
998 fn flush(&self) -> Result<(), ShellError> {
999 self.state.writer.flush()
1000 }
1001
1002 fn stream_id_sequence(&self) -> &Sequence {
1003 &self.state.stream_id_sequence
1004 }
1005
1006 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1007 &self.stream_manager_handle
1008 }
1009
1010 fn prepare_pipeline_data(
1011 &self,
1012 mut data: PipelineData,
1013 _context: &(),
1014 ) -> Result<PipelineData, ShellError> {
1015 match data {
1017 PipelineData::Value(ref mut value, _) => {
1018 PluginCustomValue::serialize_custom_values_in(value)?;
1019 Ok(data)
1020 }
1021 PipelineData::ListStream(stream, meta) => {
1022 let stream = stream.map(|mut value| {
1023 let span = value.span();
1024 PluginCustomValue::serialize_custom_values_in(&mut value)
1025 .map(|_| value)
1026 .unwrap_or_else(|err| Value::error(err, span))
1027 });
1028 Ok(PipelineData::list_stream(stream, meta))
1029 }
1030 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1031 }
1032 }
1033}
1034
1035pub struct ForegroundGuard(Option<EngineInterface>);
1039
1040impl ForegroundGuard {
1041 fn leave_internal(&mut self) -> Result<(), ShellError> {
1043 if let Some(interface) = self.0.take() {
1044 #[cfg(unix)]
1046 {
1047 use nix::unistd::{Pid, setpgid};
1048 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1050 nu_protocol::shell_error::io::IoError::new_internal(
1051 std::io::Error::from(err),
1052 "Could not set pgid",
1053 nu_protocol::location!(),
1054 )
1055 })?;
1056 }
1057 interface.leave_foreground()?;
1058 }
1059 Ok(())
1060 }
1061
1062 pub fn leave(mut self) -> Result<(), ShellError> {
1064 let result = self.leave_internal();
1065 std::mem::forget(self);
1066 result
1067 }
1068}
1069
1070impl Drop for ForegroundGuard {
1071 fn drop(&mut self) {
1072 let _ = self.leave_internal();
1073 }
1074}
1075
1076#[cfg(unix)]
1077fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1078 use nix::unistd::{Pid, setpgid};
1079 if let Ok(pgrp) = pgrp.try_into() {
1080 setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1081 error: "Failed to set process group for foreground".into(),
1082 msg: "".into(),
1083 span: None,
1084 help: Some(err.to_string()),
1085 inner: vec![],
1086 })
1087 } else {
1088 Err(ShellError::NushellFailed {
1089 msg: "Engine returned an invalid process group ID".into(),
1090 })
1091 }
1092}
1093
1094#[cfg(not(unix))]
1095fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1096 Err(ShellError::NushellFailed {
1097 msg: concat!(
1098 "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.",
1099 )
1100 .into(),
1101 })
1102}