nu_plugin_protocol/
lib.rs

1//! Type definitions, including full `Serialize` and `Deserialize` implementations, for the protocol
2//! used for communication between the engine and a plugin.
3//!
4//! See the [plugin protocol reference](https://www.nushell.sh/contributor-book/plugin_protocol_reference.html)
5//! for more details on what exactly is being specified here.
6//!
7//! Plugins accept messages of [`PluginInput`] and send messages back of [`PluginOutput`]. This
8//! crate explicitly avoids implementing any functionality that depends on I/O, so the exact
9//! byte-level encoding scheme is not implemented here. See the protocol ref or `nu_plugin_core` for
10//! more details on how that works.
11
12mod evaluated_call;
13mod plugin_custom_value;
14mod protocol_info;
15
16#[cfg(test)]
17mod tests;
18
19/// Things that can help with protocol-related tests. Not part of the public API, just used by other
20/// nushell crates.
21#[doc(hidden)]
22pub mod test_util;
23
24use nu_protocol::{
25    ByteStreamType, Config, DeclId, LabeledError, PipelineData, PipelineMetadata, PluginMetadata,
26    PluginSignature, ShellError, SignalAction, Span, Spanned, Value, ast::Operator, casing::Casing,
27    engine::Closure,
28};
29use nu_utils::SharedCow;
30use serde::{Deserialize, Serialize};
31use std::{collections::HashMap, path::PathBuf};
32
33pub use evaluated_call::EvaluatedCall;
34pub use plugin_custom_value::PluginCustomValue;
35#[allow(unused_imports)] // may be unused by compile flags
36pub use protocol_info::{Feature, Protocol, ProtocolInfo};
37
38/// A sequential identifier for a stream
39pub type StreamId = usize;
40
41/// A sequential identifier for a [`PluginCall`]
42pub type PluginCallId = usize;
43
44/// A sequential identifier for an [`EngineCall`]
45pub type EngineCallId = usize;
46
47/// Information about a plugin command invocation. This includes an [`EvaluatedCall`] as a
48/// serializable representation of [`nu_protocol::ast::Call`]. The type parameter determines
49/// the input type.
50#[derive(Serialize, Deserialize, Debug, Clone)]
51pub struct CallInfo<D> {
52    /// The name of the command to be run
53    pub name: String,
54    /// Information about the invocation, including arguments
55    pub call: EvaluatedCall,
56    /// Pipeline input. This is usually [`nu_protocol::PipelineData`] or [`PipelineDataHeader`]
57    pub input: D,
58}
59
60impl<D> CallInfo<D> {
61    /// Convert the type of `input` from `D` to `T`.
62    pub fn map_data<T>(
63        self,
64        f: impl FnOnce(D) -> Result<T, ShellError>,
65    ) -> Result<CallInfo<T>, ShellError> {
66        Ok(CallInfo {
67            name: self.name,
68            call: self.call,
69            input: f(self.input)?,
70        })
71    }
72}
73
74/// The initial (and perhaps only) part of any [`nu_protocol::PipelineData`] sent over the wire.
75///
76/// This may contain a single value, or may initiate a stream with a [`StreamId`].
77#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
78pub enum PipelineDataHeader {
79    /// No input
80    Empty,
81    /// A single value
82    Value(Value, Option<PipelineMetadata>),
83    /// Initiate [`nu_protocol::PipelineData::ListStream`].
84    ///
85    /// Items are sent via [`StreamData`]
86    ListStream(ListStreamInfo),
87    /// Initiate [`nu_protocol::PipelineData::byte_stream`].
88    ///
89    /// Items are sent via [`StreamData`]
90    ByteStream(ByteStreamInfo),
91}
92
93impl PipelineDataHeader {
94    /// Return the stream ID, if any, embedded in the header
95    pub fn stream_id(&self) -> Option<StreamId> {
96        match self {
97            PipelineDataHeader::Empty => None,
98            PipelineDataHeader::Value(_, _) => None,
99            PipelineDataHeader::ListStream(info) => Some(info.id),
100            PipelineDataHeader::ByteStream(info) => Some(info.id),
101        }
102    }
103
104    pub fn value(value: Value) -> Self {
105        PipelineDataHeader::Value(value, None)
106    }
107
108    pub fn list_stream(info: ListStreamInfo) -> Self {
109        PipelineDataHeader::ListStream(info)
110    }
111
112    pub fn byte_stream(info: ByteStreamInfo) -> Self {
113        PipelineDataHeader::ByteStream(info)
114    }
115}
116
117/// Additional information about list (value) streams
118#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
119pub struct ListStreamInfo {
120    pub id: StreamId,
121    pub span: Span,
122    pub metadata: Option<PipelineMetadata>,
123}
124
125impl ListStreamInfo {
126    /// Create a new `ListStreamInfo` with a unique ID
127    pub fn new(id: StreamId, span: Span) -> Self {
128        ListStreamInfo {
129            id,
130            span,
131            metadata: None,
132        }
133    }
134}
135
136/// Additional information about byte streams
137#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
138pub struct ByteStreamInfo {
139    pub id: StreamId,
140    pub span: Span,
141    #[serde(rename = "type")]
142    pub type_: ByteStreamType,
143    pub metadata: Option<PipelineMetadata>,
144}
145
146impl ByteStreamInfo {
147    /// Create a new `ByteStreamInfo` with a unique ID
148    pub fn new(id: StreamId, span: Span, type_: ByteStreamType) -> Self {
149        ByteStreamInfo {
150            id,
151            span,
152            type_,
153            metadata: None,
154        }
155    }
156}
157
158/// Calls that a plugin can execute. The type parameter determines the input type.
159#[derive(Serialize, Deserialize, Debug, Clone)]
160pub enum PluginCall<D> {
161    Metadata,
162    Signature,
163    Run(CallInfo<D>),
164    CustomValueOp(Spanned<PluginCustomValue>, CustomValueOp),
165}
166
167impl<D> PluginCall<D> {
168    /// Convert the data type from `D` to `T`. The function will not be called if the variant does
169    /// not contain data.
170    pub fn map_data<T>(
171        self,
172        f: impl FnOnce(D) -> Result<T, ShellError>,
173    ) -> Result<PluginCall<T>, ShellError> {
174        Ok(match self {
175            PluginCall::Metadata => PluginCall::Metadata,
176            PluginCall::Signature => PluginCall::Signature,
177            PluginCall::Run(call) => PluginCall::Run(call.map_data(f)?),
178            PluginCall::CustomValueOp(custom_value, op) => {
179                PluginCall::CustomValueOp(custom_value, op)
180            }
181        })
182    }
183
184    /// The span associated with the call.
185    pub fn span(&self) -> Option<Span> {
186        match self {
187            PluginCall::Metadata => None,
188            PluginCall::Signature => None,
189            PluginCall::Run(CallInfo { call, .. }) => Some(call.head),
190            PluginCall::CustomValueOp(val, _) => Some(val.span),
191        }
192    }
193}
194
195/// Operations supported for custom values.
196#[derive(Serialize, Deserialize, Debug, Clone)]
197pub enum CustomValueOp {
198    /// [`to_base_value()`](nu_protocol::CustomValue::to_base_value)
199    ToBaseValue,
200    /// [`follow_path_int()`](nu_protocol::CustomValue::follow_path_int)
201    FollowPathInt {
202        index: Spanned<usize>,
203        optional: bool,
204    },
205    /// [`follow_path_string()`](nu_protocol::CustomValue::follow_path_string)
206    FollowPathString {
207        column_name: Spanned<String>,
208        optional: bool,
209        casing: Casing,
210    },
211    /// [`partial_cmp()`](nu_protocol::CustomValue::partial_cmp)
212    PartialCmp(Value),
213    /// [`operation()`](nu_protocol::CustomValue::operation)
214    Operation(Spanned<Operator>, Value),
215    /// [`save()`](nu_protocol::CustomValue::save)
216    Save {
217        path: Spanned<PathBuf>,
218        save_call_span: Span,
219    },
220    /// Notify that the custom value has been dropped, if
221    /// [`notify_plugin_on_drop()`](nu_protocol::CustomValue::notify_plugin_on_drop) is true
222    Dropped,
223}
224
225impl CustomValueOp {
226    /// Get the name of the op, for error messages.
227    pub fn name(&self) -> &'static str {
228        match self {
229            CustomValueOp::ToBaseValue => "to_base_value",
230            CustomValueOp::FollowPathInt { .. } => "follow_path_int",
231            CustomValueOp::FollowPathString { .. } => "follow_path_string",
232            CustomValueOp::PartialCmp(_) => "partial_cmp",
233            CustomValueOp::Operation(_, _) => "operation",
234            CustomValueOp::Save { .. } => "save",
235            CustomValueOp::Dropped => "dropped",
236        }
237    }
238}
239
240/// Any data sent to the plugin
241#[derive(Serialize, Deserialize, Debug, Clone)]
242pub enum PluginInput {
243    /// This must be the first message. Indicates supported protocol
244    Hello(ProtocolInfo),
245    /// Execute a [`PluginCall`], such as `Run` or `Signature`. The ID should not have been used
246    /// before.
247    Call(PluginCallId, PluginCall<PipelineDataHeader>),
248    /// Don't expect any more plugin calls. Exit after all currently executing plugin calls are
249    /// finished.
250    Goodbye,
251    /// Response to an [`EngineCall`]. The ID should be the same one sent with the engine call this
252    /// is responding to
253    EngineCallResponse(EngineCallId, EngineCallResponse<PipelineDataHeader>),
254    /// See [`StreamMessage::Data`].
255    Data(StreamId, StreamData),
256    /// See [`StreamMessage::End`].
257    End(StreamId),
258    /// See [`StreamMessage::Drop`].
259    Drop(StreamId),
260    /// See [`StreamMessage::Ack`].
261    Ack(StreamId),
262    /// Relay signals to the plugin
263    Signal(SignalAction),
264}
265
266impl TryFrom<PluginInput> for StreamMessage {
267    type Error = PluginInput;
268
269    fn try_from(msg: PluginInput) -> Result<StreamMessage, PluginInput> {
270        match msg {
271            PluginInput::Data(id, data) => Ok(StreamMessage::Data(id, data)),
272            PluginInput::End(id) => Ok(StreamMessage::End(id)),
273            PluginInput::Drop(id) => Ok(StreamMessage::Drop(id)),
274            PluginInput::Ack(id) => Ok(StreamMessage::Ack(id)),
275            _ => Err(msg),
276        }
277    }
278}
279
280impl From<StreamMessage> for PluginInput {
281    fn from(stream_msg: StreamMessage) -> PluginInput {
282        match stream_msg {
283            StreamMessage::Data(id, data) => PluginInput::Data(id, data),
284            StreamMessage::End(id) => PluginInput::End(id),
285            StreamMessage::Drop(id) => PluginInput::Drop(id),
286            StreamMessage::Ack(id) => PluginInput::Ack(id),
287        }
288    }
289}
290
291/// A single item of stream data for a stream.
292#[derive(Serialize, Deserialize, Debug, Clone)]
293pub enum StreamData {
294    List(Value),
295    Raw(Result<Vec<u8>, LabeledError>),
296}
297
298impl From<Value> for StreamData {
299    fn from(value: Value) -> Self {
300        StreamData::List(value)
301    }
302}
303
304impl From<Result<Vec<u8>, LabeledError>> for StreamData {
305    fn from(value: Result<Vec<u8>, LabeledError>) -> Self {
306        StreamData::Raw(value)
307    }
308}
309
310impl From<Result<Vec<u8>, ShellError>> for StreamData {
311    fn from(value: Result<Vec<u8>, ShellError>) -> Self {
312        value.map_err(LabeledError::from).into()
313    }
314}
315
316impl TryFrom<StreamData> for Value {
317    type Error = ShellError;
318
319    fn try_from(data: StreamData) -> Result<Value, ShellError> {
320        match data {
321            StreamData::List(value) => Ok(value),
322            StreamData::Raw(_) => Err(ShellError::PluginFailedToDecode {
323                msg: "expected list stream data, found raw data".into(),
324            }),
325        }
326    }
327}
328
329impl TryFrom<StreamData> for Result<Vec<u8>, LabeledError> {
330    type Error = ShellError;
331
332    fn try_from(data: StreamData) -> Result<Result<Vec<u8>, LabeledError>, ShellError> {
333        match data {
334            StreamData::Raw(value) => Ok(value),
335            StreamData::List(_) => Err(ShellError::PluginFailedToDecode {
336                msg: "expected raw stream data, found list data".into(),
337            }),
338        }
339    }
340}
341
342impl TryFrom<StreamData> for Result<Vec<u8>, ShellError> {
343    type Error = ShellError;
344
345    fn try_from(value: StreamData) -> Result<Result<Vec<u8>, ShellError>, ShellError> {
346        Result::<Vec<u8>, LabeledError>::try_from(value).map(|res| res.map_err(ShellError::from))
347    }
348}
349
350/// A stream control or data message.
351#[derive(Serialize, Deserialize, Debug, Clone)]
352pub enum StreamMessage {
353    /// Append data to the stream. Sent by the stream producer.
354    Data(StreamId, StreamData),
355    /// End of stream. Sent by the stream producer.
356    End(StreamId),
357    /// Notify that the read end of the stream has closed, and further messages should not be
358    /// sent. Sent by the stream consumer.
359    Drop(StreamId),
360    /// Acknowledge that a message has been consumed. This is used to implement flow control by
361    /// the stream producer. Sent by the stream consumer.
362    Ack(StreamId),
363}
364
365/// Response to a [`PluginCall`]. The type parameter determines the output type for pipeline data.
366#[derive(Serialize, Deserialize, Debug, Clone)]
367pub enum PluginCallResponse<D> {
368    Ok,
369    Error(LabeledError),
370    Metadata(PluginMetadata),
371    Signature(Vec<PluginSignature>),
372    Ordering(Option<Ordering>),
373    PipelineData(D),
374}
375
376impl<D> PluginCallResponse<D> {
377    /// Convert the data type from `D` to `T`. The function will not be called if the variant does
378    /// not contain data.
379    pub fn map_data<T>(
380        self,
381        f: impl FnOnce(D) -> Result<T, ShellError>,
382    ) -> Result<PluginCallResponse<T>, ShellError> {
383        Ok(match self {
384            PluginCallResponse::Ok => PluginCallResponse::Ok,
385            PluginCallResponse::Error(err) => PluginCallResponse::Error(err),
386            PluginCallResponse::Metadata(meta) => PluginCallResponse::Metadata(meta),
387            PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs),
388            PluginCallResponse::Ordering(ordering) => PluginCallResponse::Ordering(ordering),
389            PluginCallResponse::PipelineData(input) => PluginCallResponse::PipelineData(f(input)?),
390        })
391    }
392}
393
394impl PluginCallResponse<PipelineDataHeader> {
395    /// Construct a plugin call response with a single value
396    pub fn value(value: Value) -> PluginCallResponse<PipelineDataHeader> {
397        if value.is_nothing() {
398            PluginCallResponse::PipelineData(PipelineDataHeader::Empty)
399        } else {
400            PluginCallResponse::PipelineData(PipelineDataHeader::value(value))
401        }
402    }
403}
404
405impl PluginCallResponse<PipelineData> {
406    /// Does this response have a stream?
407    pub fn has_stream(&self) -> bool {
408        match self {
409            PluginCallResponse::PipelineData(data) => match data {
410                PipelineData::Empty => false,
411                PipelineData::Value(..) => false,
412                PipelineData::ListStream(..) => true,
413                PipelineData::ByteStream(..) => true,
414            },
415            _ => false,
416        }
417    }
418}
419
420/// Options that can be changed to affect how the engine treats the plugin
421#[derive(Serialize, Deserialize, Debug, Clone)]
422pub enum PluginOption {
423    /// Send `GcDisabled(true)` to stop the plugin from being automatically garbage collected, or
424    /// `GcDisabled(false)` to enable it again.
425    ///
426    /// See `EngineInterface::set_gc_disabled()` in `nu-plugin` for more information.
427    GcDisabled(bool),
428}
429
430/// This is just a serializable version of [`std::cmp::Ordering`], and can be converted 1:1
431#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
432pub enum Ordering {
433    Less,
434    Equal,
435    Greater,
436}
437
438impl From<std::cmp::Ordering> for Ordering {
439    fn from(value: std::cmp::Ordering) -> Self {
440        match value {
441            std::cmp::Ordering::Less => Ordering::Less,
442            std::cmp::Ordering::Equal => Ordering::Equal,
443            std::cmp::Ordering::Greater => Ordering::Greater,
444        }
445    }
446}
447
448impl From<Ordering> for std::cmp::Ordering {
449    fn from(value: Ordering) -> Self {
450        match value {
451            Ordering::Less => std::cmp::Ordering::Less,
452            Ordering::Equal => std::cmp::Ordering::Equal,
453            Ordering::Greater => std::cmp::Ordering::Greater,
454        }
455    }
456}
457
458/// Information received from the plugin
459#[derive(Serialize, Deserialize, Debug, Clone)]
460pub enum PluginOutput {
461    /// This must be the first message. Indicates supported protocol
462    Hello(ProtocolInfo),
463    /// Set option. No response expected
464    Option(PluginOption),
465    /// A response to a [`PluginCall`]. The ID should be the same sent with the plugin call this
466    /// is a response to
467    CallResponse(PluginCallId, PluginCallResponse<PipelineDataHeader>),
468    /// Execute an [`EngineCall`]. Engine calls must be executed within the `context` of a plugin
469    /// call, and the `id` should not have been used before
470    EngineCall {
471        /// The plugin call (by ID) to execute in the context of
472        context: PluginCallId,
473        /// A new identifier for this engine call. The response will reference this ID
474        id: EngineCallId,
475        call: EngineCall<PipelineDataHeader>,
476    },
477    /// See [`StreamMessage::Data`].
478    Data(StreamId, StreamData),
479    /// See [`StreamMessage::End`].
480    End(StreamId),
481    /// See [`StreamMessage::Drop`].
482    Drop(StreamId),
483    /// See [`StreamMessage::Ack`].
484    Ack(StreamId),
485}
486
487impl TryFrom<PluginOutput> for StreamMessage {
488    type Error = PluginOutput;
489
490    fn try_from(msg: PluginOutput) -> Result<StreamMessage, PluginOutput> {
491        match msg {
492            PluginOutput::Data(id, data) => Ok(StreamMessage::Data(id, data)),
493            PluginOutput::End(id) => Ok(StreamMessage::End(id)),
494            PluginOutput::Drop(id) => Ok(StreamMessage::Drop(id)),
495            PluginOutput::Ack(id) => Ok(StreamMessage::Ack(id)),
496            _ => Err(msg),
497        }
498    }
499}
500
501impl From<StreamMessage> for PluginOutput {
502    fn from(stream_msg: StreamMessage) -> PluginOutput {
503        match stream_msg {
504            StreamMessage::Data(id, data) => PluginOutput::Data(id, data),
505            StreamMessage::End(id) => PluginOutput::End(id),
506            StreamMessage::Drop(id) => PluginOutput::Drop(id),
507            StreamMessage::Ack(id) => PluginOutput::Ack(id),
508        }
509    }
510}
511
512/// A remote call back to the engine during the plugin's execution.
513///
514/// The type parameter determines the input type, for calls that take pipeline data.
515#[derive(Serialize, Deserialize, Debug, Clone)]
516pub enum EngineCall<D> {
517    /// Get the full engine configuration
518    GetConfig,
519    /// Get the plugin-specific configuration (`$env.config.plugins.NAME`)
520    GetPluginConfig,
521    /// Get an environment variable
522    GetEnvVar(String),
523    /// Get all environment variables
524    GetEnvVars,
525    /// Get current working directory
526    GetCurrentDir,
527    /// Set an environment variable in the caller's scope
528    AddEnvVar(String, Value),
529    /// Get help for the current command
530    GetHelp,
531    /// Move the plugin into the foreground for terminal interaction
532    EnterForeground,
533    /// Move the plugin out of the foreground once terminal interaction has finished
534    LeaveForeground,
535    /// Get the contents of a span. Response is a binary which may not parse to UTF-8
536    GetSpanContents(Span),
537    /// Evaluate a closure with stream input/output
538    EvalClosure {
539        /// The closure to call.
540        ///
541        /// This may come from a [`Value::Closure`] passed in as an argument to the plugin.
542        closure: Spanned<Closure>,
543        /// Positional arguments to add to the closure call
544        positional: Vec<Value>,
545        /// Input to the closure
546        input: D,
547        /// Whether to redirect stdout from external commands
548        redirect_stdout: bool,
549        /// Whether to redirect stderr from external commands
550        redirect_stderr: bool,
551    },
552    /// Find a declaration by name
553    FindDecl(String),
554    /// Call a declaration with args
555    CallDecl {
556        /// The id of the declaration to be called (can be found with `FindDecl`)
557        decl_id: DeclId,
558        /// Information about the call (head span, arguments, etc.)
559        call: EvaluatedCall,
560        /// Pipeline input to the call
561        input: D,
562        /// Whether to redirect stdout from external commands
563        redirect_stdout: bool,
564        /// Whether to redirect stderr from external commands
565        redirect_stderr: bool,
566    },
567}
568
569impl<D> EngineCall<D> {
570    /// Get the name of the engine call so it can be embedded in things like error messages
571    pub fn name(&self) -> &'static str {
572        match self {
573            EngineCall::GetConfig => "GetConfig",
574            EngineCall::GetPluginConfig => "GetPluginConfig",
575            EngineCall::GetEnvVar(_) => "GetEnv",
576            EngineCall::GetEnvVars => "GetEnvs",
577            EngineCall::GetCurrentDir => "GetCurrentDir",
578            EngineCall::AddEnvVar(..) => "AddEnvVar",
579            EngineCall::GetHelp => "GetHelp",
580            EngineCall::EnterForeground => "EnterForeground",
581            EngineCall::LeaveForeground => "LeaveForeground",
582            EngineCall::GetSpanContents(_) => "GetSpanContents",
583            EngineCall::EvalClosure { .. } => "EvalClosure",
584            EngineCall::FindDecl(_) => "FindDecl",
585            EngineCall::CallDecl { .. } => "CallDecl",
586        }
587    }
588
589    /// Convert the data type from `D` to `T`. The function will not be called if the variant does
590    /// not contain data.
591    pub fn map_data<T>(
592        self,
593        f: impl FnOnce(D) -> Result<T, ShellError>,
594    ) -> Result<EngineCall<T>, ShellError> {
595        Ok(match self {
596            EngineCall::GetConfig => EngineCall::GetConfig,
597            EngineCall::GetPluginConfig => EngineCall::GetPluginConfig,
598            EngineCall::GetEnvVar(name) => EngineCall::GetEnvVar(name),
599            EngineCall::GetEnvVars => EngineCall::GetEnvVars,
600            EngineCall::GetCurrentDir => EngineCall::GetCurrentDir,
601            EngineCall::AddEnvVar(name, value) => EngineCall::AddEnvVar(name, value),
602            EngineCall::GetHelp => EngineCall::GetHelp,
603            EngineCall::EnterForeground => EngineCall::EnterForeground,
604            EngineCall::LeaveForeground => EngineCall::LeaveForeground,
605            EngineCall::GetSpanContents(span) => EngineCall::GetSpanContents(span),
606            EngineCall::EvalClosure {
607                closure,
608                positional,
609                input,
610                redirect_stdout,
611                redirect_stderr,
612            } => EngineCall::EvalClosure {
613                closure,
614                positional,
615                input: f(input)?,
616                redirect_stdout,
617                redirect_stderr,
618            },
619            EngineCall::FindDecl(name) => EngineCall::FindDecl(name),
620            EngineCall::CallDecl {
621                decl_id,
622                call,
623                input,
624                redirect_stdout,
625                redirect_stderr,
626            } => EngineCall::CallDecl {
627                decl_id,
628                call,
629                input: f(input)?,
630                redirect_stdout,
631                redirect_stderr,
632            },
633        })
634    }
635}
636
637/// The response to an [`EngineCall`]. The type parameter determines the output type for pipeline
638/// data.
639#[derive(Serialize, Deserialize, Debug, Clone)]
640pub enum EngineCallResponse<D> {
641    Error(ShellError),
642    PipelineData(D),
643    Config(SharedCow<Config>),
644    ValueMap(HashMap<String, Value>),
645    Identifier(DeclId),
646}
647
648impl<D> EngineCallResponse<D> {
649    /// Convert the data type from `D` to `T`. The function will not be called if the variant does
650    /// not contain data.
651    pub fn map_data<T>(
652        self,
653        f: impl FnOnce(D) -> Result<T, ShellError>,
654    ) -> Result<EngineCallResponse<T>, ShellError> {
655        Ok(match self {
656            EngineCallResponse::Error(err) => EngineCallResponse::Error(err),
657            EngineCallResponse::PipelineData(data) => EngineCallResponse::PipelineData(f(data)?),
658            EngineCallResponse::Config(config) => EngineCallResponse::Config(config),
659            EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map),
660            EngineCallResponse::Identifier(id) => EngineCallResponse::Identifier(id),
661        })
662    }
663}
664
665impl EngineCallResponse<PipelineData> {
666    /// Build an [`EngineCallResponse::PipelineData`] from a [`Value`]
667    pub fn value(value: Value) -> EngineCallResponse<PipelineData> {
668        EngineCallResponse::PipelineData(PipelineData::value(value, None))
669    }
670
671    /// An [`EngineCallResponse::PipelineData`] with [`PipelineData::empty()`]
672    pub const fn empty() -> EngineCallResponse<PipelineData> {
673        EngineCallResponse::PipelineData(PipelineData::empty())
674    }
675}