nu_plugin_core/interface/
mod.rs

1//! Implements the stream multiplexing interface for both the plugin side and the engine side.
2
3use nu_plugin_protocol::{ByteStreamInfo, ListStreamInfo, PipelineDataHeader, StreamMessage};
4use nu_protocol::{
5    ByteStream, ListStream, PipelineData, Reader, ShellError, Signals, engine::Sequence,
6    shell_error::io::IoError,
7};
8use std::{
9    io::{Read, Write},
10    sync::Mutex,
11    thread,
12};
13
14pub mod stream;
15
16use crate::Encoder;
17
18use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage};
19
20pub mod test_util;
21
22#[cfg(test)]
23mod tests;
24
25/// The maximum number of list stream values to send without acknowledgement. This should be tuned
26/// with consideration for memory usage.
27const LIST_STREAM_HIGH_PRESSURE: i32 = 100;
28
29/// The maximum number of raw stream buffers to send without acknowledgement. This should be tuned
30/// with consideration for memory usage.
31const RAW_STREAM_HIGH_PRESSURE: i32 = 50;
32
33/// Read input/output from the stream.
34pub trait PluginRead<T> {
35    /// Returns `Ok(None)` on end of stream.
36    fn read(&mut self) -> Result<Option<T>, ShellError>;
37}
38
39impl<R, E, T> PluginRead<T> for (R, E)
40where
41    R: std::io::BufRead,
42    E: Encoder<T>,
43{
44    fn read(&mut self) -> Result<Option<T>, ShellError> {
45        self.1.decode(&mut self.0)
46    }
47}
48
49impl<R, T> PluginRead<T> for &mut R
50where
51    R: PluginRead<T>,
52{
53    fn read(&mut self) -> Result<Option<T>, ShellError> {
54        (**self).read()
55    }
56}
57
58/// Write input/output to the stream.
59///
60/// The write should be atomic, without interference from other threads.
61pub trait PluginWrite<T>: Send + Sync {
62    fn write(&self, data: &T) -> Result<(), ShellError>;
63
64    /// Flush any internal buffers, if applicable.
65    fn flush(&self) -> Result<(), ShellError>;
66
67    /// True if this output is stdout, so that plugins can avoid using stdout for their own purpose
68    fn is_stdout(&self) -> bool {
69        false
70    }
71}
72
73impl<E, T> PluginWrite<T> for (std::io::Stdout, E)
74where
75    E: Encoder<T>,
76{
77    fn write(&self, data: &T) -> Result<(), ShellError> {
78        let mut lock = self.0.lock();
79        self.1.encode(data, &mut lock)
80    }
81
82    fn flush(&self) -> Result<(), ShellError> {
83        self.0.lock().flush().map_err(|err| {
84            ShellError::Io(IoError::new_internal(
85                err,
86                "PluginWrite could not flush",
87                nu_protocol::location!(),
88            ))
89        })
90    }
91
92    fn is_stdout(&self) -> bool {
93        true
94    }
95}
96
97impl<W, E, T> PluginWrite<T> for (Mutex<W>, E)
98where
99    W: std::io::Write + Send,
100    E: Encoder<T>,
101{
102    fn write(&self, data: &T) -> Result<(), ShellError> {
103        let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
104            msg: "writer mutex poisoned".into(),
105        })?;
106        self.1.encode(data, &mut *lock)
107    }
108
109    fn flush(&self) -> Result<(), ShellError> {
110        let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
111            msg: "writer mutex poisoned".into(),
112        })?;
113        lock.flush().map_err(|err| {
114            ShellError::Io(IoError::new_internal(
115                err,
116                "PluginWrite could not flush",
117                nu_protocol::location!(),
118            ))
119        })
120    }
121}
122
123impl<W, T> PluginWrite<T> for &W
124where
125    W: PluginWrite<T>,
126{
127    fn write(&self, data: &T) -> Result<(), ShellError> {
128        (**self).write(data)
129    }
130
131    fn flush(&self) -> Result<(), ShellError> {
132        (**self).flush()
133    }
134
135    fn is_stdout(&self) -> bool {
136        (**self).is_stdout()
137    }
138}
139
140/// An interface manager handles I/O and state management for communication between a plugin and
141/// the engine. See `PluginInterfaceManager` in `nu-plugin-engine` for communication from the engine
142/// side to a plugin, or `EngineInterfaceManager` in `nu-plugin` for communication from the plugin
143/// side to the engine.
144///
145/// There is typically one [`InterfaceManager`] consuming input from a background thread, and
146/// managing shared state.
147pub trait InterfaceManager {
148    /// The corresponding interface type.
149    type Interface: Interface + 'static;
150
151    /// The input message type.
152    type Input;
153
154    /// Make a new interface that communicates with this [`InterfaceManager`].
155    fn get_interface(&self) -> Self::Interface;
156
157    /// Consume an input message.
158    ///
159    /// When implementing, call [`.consume_stream_message()`](Self::consume_stream_message) for any encapsulated
160    /// [`StreamMessage`]s received.
161    fn consume(&mut self, input: Self::Input) -> Result<(), ShellError>;
162
163    /// Get the [`StreamManager`] for handling operations related to stream messages.
164    fn stream_manager(&self) -> &StreamManager;
165
166    /// Prepare [`PipelineData`] after reading. This is called by `read_pipeline_data()` as
167    /// a hook so that values that need special handling can be taken care of.
168    fn prepare_pipeline_data(&self, data: PipelineData) -> Result<PipelineData, ShellError>;
169
170    /// Consume an input stream message.
171    ///
172    /// This method is provided for implementors to use.
173    fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
174        self.stream_manager().handle_message(message)
175    }
176
177    /// Generate `PipelineData` for reading a stream, given a [`PipelineDataHeader`] that was
178    /// received from the other side.
179    ///
180    /// This method is provided for implementors to use.
181    fn read_pipeline_data(
182        &self,
183        header: PipelineDataHeader,
184        signals: &Signals,
185    ) -> Result<PipelineData, ShellError> {
186        self.prepare_pipeline_data(match header {
187            PipelineDataHeader::Empty => PipelineData::empty(),
188            PipelineDataHeader::Value(value, metadata) => PipelineData::value(value, metadata),
189            PipelineDataHeader::ListStream(info) => {
190                let handle = self.stream_manager().get_handle();
191                let reader = handle.read_stream(info.id, self.get_interface())?;
192                let ls = ListStream::new(reader, info.span, signals.clone());
193                PipelineData::list_stream(ls, info.metadata)
194            }
195            PipelineDataHeader::ByteStream(info) => {
196                let handle = self.stream_manager().get_handle();
197                let reader = handle.read_stream(info.id, self.get_interface())?;
198                let bs =
199                    ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
200                PipelineData::byte_stream(bs, info.metadata)
201            }
202        })
203    }
204}
205
206/// An interface provides an API for communicating with a plugin or the engine and facilitates
207/// stream I/O. See `PluginInterface` in `nu-plugin-engine` for the API from the engine side to a
208/// plugin, or `EngineInterface` in `nu-plugin` for the API from the plugin side to the engine.
209///
210/// There can be multiple copies of the interface managed by a single [`InterfaceManager`].
211pub trait Interface: Clone + Send {
212    /// The output message type, which must be capable of encapsulating a [`StreamMessage`].
213    type Output: From<StreamMessage>;
214
215    /// Any context required to construct [`PipelineData`]. Can be `()` if not needed.
216    type DataContext;
217
218    /// Write an output message.
219    fn write(&self, output: Self::Output) -> Result<(), ShellError>;
220
221    /// Flush the output buffer, so messages are visible to the other side.
222    fn flush(&self) -> Result<(), ShellError>;
223
224    /// Get the sequence for generating new [`StreamId`](nu_plugin_protocol::StreamId)s.
225    fn stream_id_sequence(&self) -> &Sequence;
226
227    /// Get the [`StreamManagerHandle`] for doing stream operations.
228    fn stream_manager_handle(&self) -> &StreamManagerHandle;
229
230    /// Prepare [`PipelineData`] to be written. This is called by `init_write_pipeline_data()` as
231    /// a hook so that values that need special handling can be taken care of.
232    fn prepare_pipeline_data(
233        &self,
234        data: PipelineData,
235        context: &Self::DataContext,
236    ) -> Result<PipelineData, ShellError>;
237
238    /// Initialize a write for [`PipelineData`]. This returns two parts: the header, which can be
239    /// embedded in the particular message that references the stream, and a writer, which will
240    /// write out all of the data in the pipeline when `.write()` is called.
241    ///
242    /// Note that not all [`PipelineData`] starts a stream. You should call `write()` anyway, as
243    /// it will automatically handle this case.
244    ///
245    /// This method is provided for implementors to use.
246    fn init_write_pipeline_data(
247        &self,
248        data: PipelineData,
249        context: &Self::DataContext,
250    ) -> Result<(PipelineDataHeader, PipelineDataWriter<Self>), ShellError> {
251        // Allocate a stream id and a writer
252        let new_stream = |high_pressure_mark: i32| {
253            // Get a free stream id
254            let id = self.stream_id_sequence().next()?;
255            // Create the writer
256            let writer =
257                self.stream_manager_handle()
258                    .write_stream(id, self.clone(), high_pressure_mark)?;
259            Ok::<_, ShellError>((id, writer))
260        };
261        match self.prepare_pipeline_data(data, context)? {
262            PipelineData::Value(value, metadata) => Ok((
263                PipelineDataHeader::Value(value, metadata),
264                PipelineDataWriter::None,
265            )),
266            PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
267            PipelineData::ListStream(stream, metadata) => {
268                let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
269                Ok((
270                    PipelineDataHeader::ListStream(ListStreamInfo {
271                        id,
272                        span: stream.span(),
273                        metadata,
274                    }),
275                    PipelineDataWriter::ListStream(writer, stream),
276                ))
277            }
278            PipelineData::ByteStream(stream, metadata) => {
279                let span = stream.span();
280                let type_ = stream.type_();
281                if let Some(reader) = stream.reader() {
282                    let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
283                    let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
284                        id,
285                        span,
286                        type_,
287                        metadata,
288                    });
289                    Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
290                } else {
291                    Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))
292                }
293            }
294        }
295    }
296}
297
298impl<T> WriteStreamMessage for T
299where
300    T: Interface,
301{
302    fn write_stream_message(&mut self, msg: StreamMessage) -> Result<(), ShellError> {
303        self.write(msg.into())
304    }
305
306    fn flush(&mut self) -> Result<(), ShellError> {
307        <Self as Interface>::flush(self)
308    }
309}
310
311/// Completes the write operation for a [`PipelineData`]. You must call
312/// [`PipelineDataWriter::write()`] to write all of the data contained within the streams.
313#[derive(Default)]
314#[must_use]
315pub enum PipelineDataWriter<W: WriteStreamMessage> {
316    #[default]
317    None,
318    ListStream(StreamWriter<W>, ListStream),
319    ByteStream(StreamWriter<W>, Reader),
320}
321
322impl<W> PipelineDataWriter<W>
323where
324    W: WriteStreamMessage + Send + 'static,
325{
326    /// Write all of the data in each of the streams. This method waits for completion.
327    pub fn write(self) -> Result<(), ShellError> {
328        match self {
329            // If no stream was contained in the PipelineData, do nothing.
330            PipelineDataWriter::None => Ok(()),
331            // Write a list stream.
332            PipelineDataWriter::ListStream(mut writer, stream) => {
333                writer.write_all(stream)?;
334                Ok(())
335            }
336            // Write a byte stream.
337            PipelineDataWriter::ByteStream(mut writer, mut reader) => {
338                let span = reader.span();
339                let buf = &mut [0; 8192];
340                writer.write_all(std::iter::from_fn(move || match reader.read(buf) {
341                    Ok(0) => None,
342                    Ok(len) => Some(Ok(buf[..len].to_vec())),
343                    Err(err) => Some(Err(ShellError::from(IoError::new(err, span, None)))),
344                }))?;
345                Ok(())
346            }
347        }
348    }
349
350    /// Write all of the data in each of the streams. This method returns immediately; any necessary
351    /// write will happen in the background. If a thread was spawned, its handle is returned.
352    pub fn write_background(
353        self,
354    ) -> Result<Option<thread::JoinHandle<Result<(), ShellError>>>, ShellError> {
355        match self {
356            PipelineDataWriter::None => Ok(None),
357            _ => Ok(Some(
358                thread::Builder::new()
359                    .name("plugin stream background writer".into())
360                    .spawn(move || {
361                        let result = self.write();
362                        if let Err(ref err) = result {
363                            // Assume that the background thread error probably won't be handled and log it
364                            // here just in case.
365                            log::warn!("Error while writing pipeline in background: {err}");
366                        }
367                        result
368                    })
369                    .map_err(|err| {
370                        IoError::new_internal(
371                            err,
372                            "Could not spawn plugin stream background writer",
373                            nu_protocol::location!(),
374                        )
375                    })?,
376            )),
377        }
378    }
379}