1use nu_plugin_core::{
4 Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager,
5 StreamManagerHandle,
6 util::{Waitable, WaitableMut},
7};
8use nu_plugin_protocol::{
9 CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, EvaluatedCall,
10 GetCompletionInfo, Ordering, PluginCall, PluginCallId, PluginCallResponse, PluginCustomValue,
11 PluginInput, PluginOption, PluginOutput, ProtocolInfo,
12};
13use nu_protocol::{
14 Config, DeclId, DynamicSuggestion, Handler, HandlerGuard, Handlers, LabeledError, PipelineData,
15 PluginMetadata, PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
16 engine::{Closure, Sequence},
17};
18use nu_utils::SharedCow;
19use std::{
20 collections::{BTreeMap, HashMap, btree_map},
21 sync::{Arc, atomic::AtomicBool, mpsc},
22};
23
24#[derive(Debug)]
32#[doc(hidden)]
33pub enum ReceivedPluginCall {
34 Metadata {
35 engine: EngineInterface,
36 },
37 Signature {
38 engine: EngineInterface,
39 },
40 Run {
41 engine: EngineInterface,
42 call: CallInfo<PipelineData>,
43 },
44 GetCompletion {
45 engine: EngineInterface,
46 info: GetCompletionInfo,
47 },
48 CustomValueOp {
49 engine: EngineInterface,
50 custom_value: Spanned<PluginCustomValue>,
51 op: CustomValueOp,
52 },
53}
54
55#[cfg(test)]
56mod tests;
57
58struct EngineInterfaceState {
60 protocol_info: Waitable<Arc<ProtocolInfo>>,
62 engine_call_id_sequence: Sequence,
64 stream_id_sequence: Sequence,
66 engine_call_subscription_sender:
68 mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
69 writer: Box<dyn PluginWrite<PluginOutput>>,
71 signals: Signals,
74 signal_handlers: Handlers,
76}
77
78impl std::fmt::Debug for EngineInterfaceState {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("EngineInterfaceState")
81 .field("protocol_info", &self.protocol_info)
82 .field("engine_call_id_sequence", &self.engine_call_id_sequence)
83 .field("stream_id_sequence", &self.stream_id_sequence)
84 .field(
85 "engine_call_subscription_sender",
86 &self.engine_call_subscription_sender,
87 )
88 .finish_non_exhaustive()
89 }
90}
91
92#[derive(Debug)]
96#[doc(hidden)]
97pub struct EngineInterfaceManager {
98 state: Arc<EngineInterfaceState>,
100 protocol_info_mut: WaitableMut<Arc<ProtocolInfo>>,
102 plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
104 plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
106 engine_call_subscriptions:
108 BTreeMap<EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>>,
109 engine_call_subscription_receiver:
111 mpsc::Receiver<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
112 stream_manager: StreamManager,
114}
115
116impl EngineInterfaceManager {
117 pub(crate) fn new(writer: impl PluginWrite<PluginOutput> + 'static) -> EngineInterfaceManager {
118 let (plug_tx, plug_rx) = mpsc::channel();
119 let (subscription_tx, subscription_rx) = mpsc::channel();
120 let protocol_info_mut = WaitableMut::new();
121
122 EngineInterfaceManager {
123 state: Arc::new(EngineInterfaceState {
124 protocol_info: protocol_info_mut.reader(),
125 engine_call_id_sequence: Sequence::default(),
126 stream_id_sequence: Sequence::default(),
127 engine_call_subscription_sender: subscription_tx,
128 writer: Box::new(writer),
129 signals: Signals::new(Arc::new(AtomicBool::new(false))),
130 signal_handlers: Handlers::new(),
131 }),
132 protocol_info_mut,
133 plugin_call_sender: Some(plug_tx),
134 plugin_call_receiver: Some(plug_rx),
135 engine_call_subscriptions: BTreeMap::new(),
136 engine_call_subscription_receiver: subscription_rx,
137 stream_manager: StreamManager::new(),
138 }
139 }
140
141 pub(crate) fn take_plugin_call_receiver(
144 &mut self,
145 ) -> Option<mpsc::Receiver<ReceivedPluginCall>> {
146 self.plugin_call_receiver.take()
147 }
148
149 fn interface_for_context(&self, context: PluginCallId) -> EngineInterface {
151 EngineInterface {
152 state: self.state.clone(),
153 stream_manager_handle: self.stream_manager.get_handle(),
154 context: Some(context),
155 }
156 }
157
158 fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
160 self.plugin_call_sender
161 .as_ref()
162 .ok_or_else(|| ShellError::PluginFailedToDecode {
163 msg: "Received a plugin call after Goodbye".into(),
164 })?
165 .send(plugin_call)
166 .map_err(|_| ShellError::NushellFailed {
167 msg: "Received a plugin call, but there's nowhere to send it".into(),
168 })
169 }
170
171 fn receive_engine_call_subscriptions(&mut self) {
173 for (id, subscription) in self.engine_call_subscription_receiver.try_iter() {
174 if let btree_map::Entry::Vacant(e) = self.engine_call_subscriptions.entry(id) {
175 e.insert(subscription);
176 } else {
177 log::warn!("Duplicate engine call ID ignored: {id}")
178 }
179 }
180 }
181
182 fn send_engine_call_response(
184 &mut self,
185 id: EngineCallId,
186 response: EngineCallResponse<PipelineData>,
187 ) -> Result<(), ShellError> {
188 self.receive_engine_call_subscriptions();
190 if let Some(sender) = self.engine_call_subscriptions.remove(&id) {
192 if sender.send(response).is_err() {
193 log::warn!("Received an engine call response for id={id}, but the caller hung up");
194 }
195 Ok(())
196 } else {
197 Err(ShellError::PluginFailedToDecode {
198 msg: format!("Unknown engine call ID: {id}"),
199 })
200 }
201 }
202
203 pub(crate) fn is_finished(&self) -> bool {
206 Arc::strong_count(&self.state) < 2
207 }
208
209 pub(crate) fn consume_all(
213 &mut self,
214 mut reader: impl PluginRead<PluginInput>,
215 ) -> Result<(), ShellError> {
216 while let Some(msg) = reader.read().transpose() {
217 if self.is_finished() {
218 break;
219 }
220
221 if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
222 let _ = self.stream_manager.broadcast_read_error(err.clone());
224 self.receive_engine_call_subscriptions();
226 for sender in std::mem::take(&mut self.engine_call_subscriptions).into_values() {
227 let _ = sender.send(EngineCallResponse::Error(err.clone()));
228 }
229 return Err(err);
230 }
231 }
232 Ok(())
233 }
234}
235
236impl InterfaceManager for EngineInterfaceManager {
237 type Interface = EngineInterface;
238 type Input = PluginInput;
239
240 fn get_interface(&self) -> Self::Interface {
241 EngineInterface {
242 state: self.state.clone(),
243 stream_manager_handle: self.stream_manager.get_handle(),
244 context: None,
245 }
246 }
247
248 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
249 log::trace!("from engine: {input:?}");
250 match input {
251 PluginInput::Hello(info) => {
252 let info = Arc::new(info);
253 self.protocol_info_mut.set(info.clone())?;
254
255 let local_info = ProtocolInfo::default();
256 if local_info.is_compatible_with(&info)? {
257 Ok(())
258 } else {
259 Err(ShellError::PluginFailedToLoad {
260 msg: format!(
261 "Plugin is compiled for nushell version {}, \
262 which is not compatible with version {}",
263 local_info.version, info.version
264 ),
265 })
266 }
267 }
268 _ if !self.state.protocol_info.is_set() => {
269 Err(ShellError::PluginFailedToLoad {
271 msg: "Failed to receive initial Hello message. This engine might be too old"
272 .into(),
273 })
274 }
275 PluginInput::Data(..)
277 | PluginInput::End(..)
278 | PluginInput::Drop(..)
279 | PluginInput::Ack(..) => {
280 self.consume_stream_message(input.try_into().map_err(|msg| {
281 ShellError::NushellFailed {
282 msg: format!("Failed to convert message {msg:?} to StreamMessage"),
283 }
284 })?)
285 }
286 PluginInput::Call(id, call) => {
287 let interface = self.interface_for_context(id);
288 let call = match call
290 .map_data(|input| self.read_pipeline_data(input, &Signals::empty()))
291 {
292 Ok(call) => call,
293 Err(err) => {
294 return interface.write_response(Err(err))?.write();
297 }
298 };
299 match call {
300 PluginCall::Metadata => {
302 self.send_plugin_call(ReceivedPluginCall::Metadata { engine: interface })
303 }
304 PluginCall::Signature => {
306 self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
307 }
308 PluginCall::Run(mut call_info) => {
310 if let Err(err) = deserialize_call_args(&mut call_info.call) {
312 return interface.write_response(Err(err))?.write();
313 }
314 self.send_plugin_call(ReceivedPluginCall::Run {
316 engine: interface,
317 call: call_info,
318 })
319 }
320 PluginCall::CustomValueOp(custom_value, op) => {
322 self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
323 engine: interface,
324 custom_value,
325 op,
326 })
327 }
328 PluginCall::GetCompletion(info) => {
329 self.send_plugin_call(ReceivedPluginCall::GetCompletion {
330 engine: interface,
331 info,
332 })
333 }
334 }
335 }
336 PluginInput::Goodbye => {
337 drop(self.plugin_call_sender.take());
339 Ok(())
340 }
341 PluginInput::EngineCallResponse(id, response) => {
342 let response = response
343 .map_data(|header| self.read_pipeline_data(header, &Signals::empty()))
344 .unwrap_or_else(|err| {
345 EngineCallResponse::Error(err)
348 });
349 self.send_engine_call_response(id, response)
350 }
351 PluginInput::Signal(action) => {
352 match action {
353 SignalAction::Interrupt => self.state.signals.trigger(),
354 SignalAction::Reset => self.state.signals.reset(),
355 }
356 self.state.signal_handlers.run(action);
357 Ok(())
358 }
359 }
360 }
361
362 fn stream_manager(&self) -> &StreamManager {
363 &self.stream_manager
364 }
365
366 fn prepare_pipeline_data(&self, mut data: PipelineData) -> Result<PipelineData, ShellError> {
367 match data {
369 PipelineData::Value(ref mut value, _) => {
370 PluginCustomValue::deserialize_custom_values_in(value)?;
371 Ok(data)
372 }
373 PipelineData::ListStream(stream, meta) => {
374 let stream = stream.map(|mut value| {
375 let span = value.span();
376 PluginCustomValue::deserialize_custom_values_in(&mut value)
377 .map(|()| value)
378 .unwrap_or_else(|err| Value::error(err, span))
379 });
380 Ok(PipelineData::list_stream(stream, meta))
381 }
382 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
383 }
384 }
385}
386
387fn deserialize_call_args(call: &mut crate::EvaluatedCall) -> Result<(), ShellError> {
389 call.positional
390 .iter_mut()
391 .try_for_each(PluginCustomValue::deserialize_custom_values_in)?;
392 call.named
393 .iter_mut()
394 .flat_map(|(_, value)| value.as_mut())
395 .try_for_each(PluginCustomValue::deserialize_custom_values_in)
396}
397
398#[derive(Debug, Clone)]
400pub struct EngineInterface {
401 state: Arc<EngineInterfaceState>,
403 stream_manager_handle: StreamManagerHandle,
405 context: Option<PluginCallId>,
407}
408
409impl EngineInterface {
410 pub(crate) fn hello(&self) -> Result<(), ShellError> {
412 self.write(PluginOutput::Hello(ProtocolInfo::default()))?;
413 self.flush()
414 }
415
416 fn context(&self) -> Result<PluginCallId, ShellError> {
417 self.context.ok_or_else(|| ShellError::NushellFailed {
418 msg: "Tried to call an EngineInterface method that requires a call context \
419 outside of one"
420 .into(),
421 })
422 }
423
424 pub(crate) fn write_ok(
426 &self,
427 result: Result<(), impl Into<LabeledError>>,
428 ) -> Result<(), ShellError> {
429 let response = match result {
430 Ok(()) => PluginCallResponse::Ok,
431 Err(err) => PluginCallResponse::Error(err.into()),
432 };
433 self.write(PluginOutput::CallResponse(self.context()?, response))?;
434 self.flush()
435 }
436
437 pub(crate) fn write_response(
440 &self,
441 result: Result<PipelineData, impl Into<LabeledError>>,
442 ) -> Result<PipelineDataWriter<Self>, ShellError> {
443 match result {
444 Ok(data) => {
445 let (header, writer) = match self.init_write_pipeline_data(data, &()) {
446 Ok(tup) => tup,
447 Err(err) => return self.write_response(Err(err)),
450 };
451 let response = PluginCallResponse::PipelineData(header);
453 self.write(PluginOutput::CallResponse(self.context()?, response))?;
454 self.flush()?;
455 Ok(writer)
456 }
457 Err(err) => {
458 let response = PluginCallResponse::Error(err.into());
459 self.write(PluginOutput::CallResponse(self.context()?, response))?;
460 self.flush()?;
461 Ok(Default::default())
462 }
463 }
464 }
465
466 pub(crate) fn write_metadata(&self, metadata: PluginMetadata) -> Result<(), ShellError> {
468 let response = PluginCallResponse::Metadata(metadata);
469 self.write(PluginOutput::CallResponse(self.context()?, response))?;
470 self.flush()
471 }
472
473 pub(crate) fn write_signature(
477 &self,
478 signature: Vec<PluginSignature>,
479 ) -> Result<(), ShellError> {
480 let response = PluginCallResponse::Signature(signature);
481 self.write(PluginOutput::CallResponse(self.context()?, response))?;
482 self.flush()
483 }
484
485 pub(crate) fn write_completion_items(
487 &self,
488 items: Option<Vec<DynamicSuggestion>>,
489 ) -> Result<(), ShellError> {
490 let response = PluginCallResponse::CompletionItems(items);
491 self.write(PluginOutput::CallResponse(self.context()?, response))?;
492 self.flush()
493 }
494
495 fn write_engine_call(
498 &self,
499 call: EngineCall<PipelineData>,
500 ) -> Result<
501 (
502 PipelineDataWriter<Self>,
503 mpsc::Receiver<EngineCallResponse<PipelineData>>,
504 ),
505 ShellError,
506 > {
507 let context = self.context()?;
508 let id = self.state.engine_call_id_sequence.next()?;
509 let (tx, rx) = mpsc::channel();
510
511 let mut writer = None;
513
514 let call = call.map_data(|input| {
515 let (input_header, input_writer) = self.init_write_pipeline_data(input, &())?;
516 writer = Some(input_writer);
517 Ok(input_header)
518 })?;
519
520 self.state
522 .engine_call_subscription_sender
523 .send((id, tx))
524 .map_err(|_| ShellError::NushellFailed {
525 msg: "EngineInterfaceManager hung up and is no longer accepting engine calls"
526 .into(),
527 })?;
528
529 self.write(PluginOutput::EngineCall { context, id, call })?;
531 self.flush()?;
532
533 Ok((writer.unwrap_or_default(), rx))
534 }
535
536 fn engine_call(
538 &self,
539 call: EngineCall<PipelineData>,
540 ) -> Result<EngineCallResponse<PipelineData>, ShellError> {
541 let (writer, rx) = self.write_engine_call(call)?;
542
543 writer.write_background()?;
545
546 rx.recv().map_err(|_| ShellError::NushellFailed {
548 msg: "Failed to get response to engine call because the channel was closed".into(),
549 })
550 }
551
552 pub fn is_using_stdio(&self) -> bool {
558 self.state.writer.is_stdout()
559 }
560
561 pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
564 self.state.signal_handlers.register(handler)
565 }
566
567 pub fn get_config(&self) -> Result<Arc<Config>, ShellError> {
584 match self.engine_call(EngineCall::GetConfig)? {
585 EngineCallResponse::Config(config) => Ok(SharedCow::into_arc(config)),
586 EngineCallResponse::Error(err) => Err(err),
587 _ => Err(ShellError::PluginFailedToDecode {
588 msg: "Received unexpected response for EngineCall::GetConfig".into(),
589 }),
590 }
591 }
592
593 fn engine_call_option_value(
596 &self,
597 engine_call: EngineCall<PipelineData>,
598 ) -> Result<Option<Value>, ShellError> {
599 let name = engine_call.name();
600 match self.engine_call(engine_call)? {
601 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
602 EngineCallResponse::PipelineData(PipelineData::Value(value, _)) => Ok(Some(value)),
603 EngineCallResponse::Error(err) => Err(err),
604 _ => Err(ShellError::PluginFailedToDecode {
605 msg: format!("Received unexpected response for EngineCall::{name}"),
606 }),
607 }
608 }
609
610 pub fn get_plugin_config(&self) -> Result<Option<Value>, ShellError> {
628 self.engine_call_option_value(EngineCall::GetPluginConfig)
629 }
630
631 pub fn get_env_var(&self, name: impl Into<String>) -> Result<Option<Value>, ShellError> {
647 self.engine_call_option_value(EngineCall::GetEnvVar(name.into()))
648 }
649
650 pub fn get_current_dir(&self) -> Result<String, ShellError> {
661 match self.engine_call(EngineCall::GetCurrentDir)? {
662 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
664 Ok(val)
665 }
666 EngineCallResponse::Error(err) => Err(err),
667 _ => Err(ShellError::PluginFailedToDecode {
668 msg: "Received unexpected response for EngineCall::GetCurrentDir".into(),
669 }),
670 }
671 }
672
673 pub fn get_env_vars(&self) -> Result<HashMap<String, Value>, ShellError> {
689 match self.engine_call(EngineCall::GetEnvVars)? {
690 EngineCallResponse::ValueMap(map) => Ok(map),
691 EngineCallResponse::Error(err) => Err(err),
692 _ => Err(ShellError::PluginFailedToDecode {
693 msg: "Received unexpected response type for EngineCall::GetEnvVars".into(),
694 }),
695 }
696 }
697
698 pub fn add_env_var(&self, name: impl Into<String>, value: Value) -> Result<(), ShellError> {
713 match self.engine_call(EngineCall::AddEnvVar(name.into(), value))? {
714 EngineCallResponse::PipelineData(_) => Ok(()),
715 EngineCallResponse::Error(err) => Err(err),
716 _ => Err(ShellError::PluginFailedToDecode {
717 msg: "Received unexpected response type for EngineCall::AddEnvVar".into(),
718 }),
719 }
720 }
721
722 pub fn get_help(&self) -> Result<String, ShellError> {
737 match self.engine_call(EngineCall::GetHelp)? {
738 EngineCallResponse::PipelineData(PipelineData::Value(Value::String { val, .. }, _)) => {
739 Ok(val)
740 }
741 _ => Err(ShellError::PluginFailedToDecode {
742 msg: "Received unexpected response type for EngineCall::GetHelp".into(),
743 }),
744 }
745 }
746
747 pub fn enter_foreground(&self) -> Result<ForegroundGuard, ShellError> {
755 match self.engine_call(EngineCall::EnterForeground)? {
756 EngineCallResponse::Error(error) => Err(error),
757 EngineCallResponse::PipelineData(PipelineData::Value(
758 Value::Int { val: pgrp, .. },
759 _,
760 )) => {
761 set_pgrp_from_enter_foreground(pgrp)?;
762 Ok(ForegroundGuard(Some(self.clone())))
763 }
764 EngineCallResponse::PipelineData(PipelineData::Empty) => {
765 Ok(ForegroundGuard(Some(self.clone())))
766 }
767 _ => Err(ShellError::PluginFailedToDecode {
768 msg: "Received unexpected response type for EngineCall::SetForeground".into(),
769 }),
770 }
771 }
772
773 fn leave_foreground(&self) -> Result<(), ShellError> {
775 match self.engine_call(EngineCall::LeaveForeground)? {
776 EngineCallResponse::Error(error) => Err(error),
777 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(()),
778 _ => Err(ShellError::PluginFailedToDecode {
779 msg: "Received unexpected response type for EngineCall::LeaveForeground".into(),
780 }),
781 }
782 }
783
784 pub fn get_span_contents(&self, span: Span) -> Result<Vec<u8>, ShellError> {
790 match self.engine_call(EngineCall::GetSpanContents(span))? {
791 EngineCallResponse::PipelineData(PipelineData::Value(Value::Binary { val, .. }, _)) => {
792 Ok(val)
793 }
794 _ => Err(ShellError::PluginFailedToDecode {
795 msg: "Received unexpected response type for EngineCall::GetSpanContents".into(),
796 }),
797 }
798 }
799
800 pub fn eval_closure_with_stream(
846 &self,
847 closure: &Spanned<Closure>,
848 mut positional: Vec<Value>,
849 input: PipelineData,
850 redirect_stdout: bool,
851 redirect_stderr: bool,
852 ) -> Result<PipelineData, ShellError> {
853 positional
855 .iter_mut()
856 .try_for_each(PluginCustomValue::serialize_custom_values_in)?;
857
858 let call = EngineCall::EvalClosure {
859 closure: closure.clone(),
860 positional,
861 input,
862 redirect_stdout,
863 redirect_stderr,
864 };
865
866 match self.engine_call(call)? {
867 EngineCallResponse::Error(error) => Err(error),
868 EngineCallResponse::PipelineData(data) => Ok(data),
869 _ => Err(ShellError::PluginFailedToDecode {
870 msg: "Received unexpected response type for EngineCall::EvalClosure".into(),
871 }),
872 }
873 }
874
875 pub fn eval_closure(
916 &self,
917 closure: &Spanned<Closure>,
918 positional: Vec<Value>,
919 input: Option<Value>,
920 ) -> Result<Value, ShellError> {
921 let input = input.map_or_else(PipelineData::empty, |v| PipelineData::value(v, None));
922 let output = self.eval_closure_with_stream(closure, positional, input, true, false)?;
923 match output.into_value(closure.span)? {
925 Value::Error { error, .. } => Err(*error),
926 value => Ok(value),
927 }
928 }
929
930 pub fn find_decl(&self, name: impl Into<String>) -> Result<Option<DeclId>, ShellError> {
935 let call = EngineCall::FindDecl(name.into());
936
937 match self.engine_call(call)? {
938 EngineCallResponse::Error(err) => Err(err),
939 EngineCallResponse::Identifier(id) => Ok(Some(id)),
940 EngineCallResponse::PipelineData(PipelineData::Empty) => Ok(None),
941 _ => Err(ShellError::PluginFailedToDecode {
942 msg: "Received unexpected response type for EngineCall::FindDecl".into(),
943 }),
944 }
945 }
946
947 pub fn call_decl(
971 &self,
972 decl_id: DeclId,
973 call: EvaluatedCall,
974 input: PipelineData,
975 redirect_stdout: bool,
976 redirect_stderr: bool,
977 ) -> Result<PipelineData, ShellError> {
978 let call = EngineCall::CallDecl {
979 decl_id,
980 call,
981 input,
982 redirect_stdout,
983 redirect_stderr,
984 };
985
986 match self.engine_call(call)? {
987 EngineCallResponse::Error(err) => Err(err),
988 EngineCallResponse::PipelineData(data) => Ok(data),
989 _ => Err(ShellError::PluginFailedToDecode {
990 msg: "Received unexpected response type for EngineCall::CallDecl".into(),
991 }),
992 }
993 }
994
995 pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
1003 self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
1004 self.flush()
1005 }
1006
1007 pub(crate) fn write_ordering(
1009 &self,
1010 ordering: Option<impl Into<Ordering>>,
1011 ) -> Result<(), ShellError> {
1012 let response = PluginCallResponse::Ordering(ordering.map(|o| o.into()));
1013 self.write(PluginOutput::CallResponse(self.context()?, response))?;
1014 self.flush()
1015 }
1016
1017 pub fn signals(&self) -> &Signals {
1018 &self.state.signals
1019 }
1020}
1021
1022impl Interface for EngineInterface {
1023 type Output = PluginOutput;
1024 type DataContext = ();
1025
1026 fn write(&self, output: PluginOutput) -> Result<(), ShellError> {
1027 log::trace!("to engine: {output:?}");
1028 self.state.writer.write(&output)
1029 }
1030
1031 fn flush(&self) -> Result<(), ShellError> {
1032 self.state.writer.flush()
1033 }
1034
1035 fn stream_id_sequence(&self) -> &Sequence {
1036 &self.state.stream_id_sequence
1037 }
1038
1039 fn stream_manager_handle(&self) -> &StreamManagerHandle {
1040 &self.stream_manager_handle
1041 }
1042
1043 fn prepare_pipeline_data(
1044 &self,
1045 mut data: PipelineData,
1046 _context: &(),
1047 ) -> Result<PipelineData, ShellError> {
1048 match data {
1050 PipelineData::Value(ref mut value, _) => {
1051 PluginCustomValue::serialize_custom_values_in(value)?;
1052 Ok(data)
1053 }
1054 PipelineData::ListStream(stream, meta) => {
1055 let stream = stream.map(|mut value| {
1056 let span = value.span();
1057 PluginCustomValue::serialize_custom_values_in(&mut value)
1058 .map(|_| value)
1059 .unwrap_or_else(|err| Value::error(err, span))
1060 });
1061 Ok(PipelineData::list_stream(stream, meta))
1062 }
1063 PipelineData::Empty | PipelineData::ByteStream(..) => Ok(data),
1064 }
1065 }
1066}
1067
1068pub struct ForegroundGuard(Option<EngineInterface>);
1072
1073impl ForegroundGuard {
1074 fn leave_internal(&mut self) -> Result<(), ShellError> {
1076 if let Some(interface) = self.0.take() {
1077 #[cfg(unix)]
1079 {
1080 use nix::unistd::{Pid, setpgid};
1081 setpgid(Pid::from_raw(0), Pid::from_raw(0)).map_err(|err| {
1083 nu_protocol::shell_error::io::IoError::new_internal(
1084 std::io::Error::from(err),
1085 "Could not set pgid",
1086 nu_protocol::location!(),
1087 )
1088 })?;
1089 }
1090 interface.leave_foreground()?;
1091 }
1092 Ok(())
1093 }
1094
1095 pub fn leave(mut self) -> Result<(), ShellError> {
1097 let result = self.leave_internal();
1098 std::mem::forget(self);
1099 result
1100 }
1101}
1102
1103impl Drop for ForegroundGuard {
1104 fn drop(&mut self) {
1105 let _ = self.leave_internal();
1106 }
1107}
1108
1109#[cfg(unix)]
1110fn set_pgrp_from_enter_foreground(pgrp: i64) -> Result<(), ShellError> {
1111 use nix::unistd::{Pid, setpgid};
1112 if let Ok(pgrp) = pgrp.try_into() {
1113 setpgid(Pid::from_raw(0), Pid::from_raw(pgrp)).map_err(|err| ShellError::GenericError {
1114 error: "Failed to set process group for foreground".into(),
1115 msg: "".into(),
1116 span: None,
1117 help: Some(err.to_string()),
1118 inner: vec![],
1119 })
1120 } else {
1121 Err(ShellError::NushellFailed {
1122 msg: "Engine returned an invalid process group ID".into(),
1123 })
1124 }
1125}
1126
1127#[cfg(not(unix))]
1128fn set_pgrp_from_enter_foreground(_pgrp: i64) -> Result<(), ShellError> {
1129 Err(ShellError::NushellFailed {
1130 msg: "EnterForeground asked plugin to join process group, but this is not supported on non UNIX platforms.".to_string(),
1131 })
1132}