nu_plugin_core/interface/
mod.rs1use nu_plugin_protocol::{ByteStreamInfo, ListStreamInfo, PipelineDataHeader, StreamMessage};
4use nu_protocol::{
5 ByteStream, ListStream, PipelineData, Reader, ShellError, Signals, engine::Sequence,
6 shell_error::io::IoError,
7};
8use std::{
9 io::{Read, Write},
10 sync::Mutex,
11 thread,
12};
13
14pub mod stream;
15
16use crate::Encoder;
17
18use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage};
19
20pub mod test_util;
21
22#[cfg(test)]
23mod tests;
24
25const LIST_STREAM_HIGH_PRESSURE: i32 = 100;
28
29const RAW_STREAM_HIGH_PRESSURE: i32 = 50;
32
33pub trait PluginRead<T> {
35 fn read(&mut self) -> Result<Option<T>, ShellError>;
37}
38
39impl<R, E, T> PluginRead<T> for (R, E)
40where
41 R: std::io::BufRead,
42 E: Encoder<T>,
43{
44 fn read(&mut self) -> Result<Option<T>, ShellError> {
45 self.1.decode(&mut self.0)
46 }
47}
48
49impl<R, T> PluginRead<T> for &mut R
50where
51 R: PluginRead<T>,
52{
53 fn read(&mut self) -> Result<Option<T>, ShellError> {
54 (**self).read()
55 }
56}
57
58pub trait PluginWrite<T>: Send + Sync {
62 fn write(&self, data: &T) -> Result<(), ShellError>;
63
64 fn flush(&self) -> Result<(), ShellError>;
66
67 fn is_stdout(&self) -> bool {
69 false
70 }
71}
72
73impl<E, T> PluginWrite<T> for (std::io::Stdout, E)
74where
75 E: Encoder<T>,
76{
77 fn write(&self, data: &T) -> Result<(), ShellError> {
78 let mut lock = self.0.lock();
79 self.1.encode(data, &mut lock)
80 }
81
82 fn flush(&self) -> Result<(), ShellError> {
83 self.0.lock().flush().map_err(|err| {
84 ShellError::Io(IoError::new_internal(err, "PluginWrite could not flush"))
85 })
86 }
87
88 fn is_stdout(&self) -> bool {
89 true
90 }
91}
92
93impl<W, E, T> PluginWrite<T> for (Mutex<W>, E)
94where
95 W: std::io::Write + Send,
96 E: Encoder<T>,
97{
98 fn write(&self, data: &T) -> Result<(), ShellError> {
99 let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
100 msg: "writer mutex poisoned".into(),
101 })?;
102 self.1.encode(data, &mut *lock)
103 }
104
105 fn flush(&self) -> Result<(), ShellError> {
106 let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
107 msg: "writer mutex poisoned".into(),
108 })?;
109 lock.flush().map_err(|err| {
110 ShellError::Io(IoError::new_internal(err, "PluginWrite could not flush"))
111 })
112 }
113}
114
115impl<W, T> PluginWrite<T> for &W
116where
117 W: PluginWrite<T>,
118{
119 fn write(&self, data: &T) -> Result<(), ShellError> {
120 (**self).write(data)
121 }
122
123 fn flush(&self) -> Result<(), ShellError> {
124 (**self).flush()
125 }
126
127 fn is_stdout(&self) -> bool {
128 (**self).is_stdout()
129 }
130}
131
132pub trait InterfaceManager {
140 type Interface: Interface + 'static;
142
143 type Input;
145
146 fn get_interface(&self) -> Self::Interface;
148
149 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError>;
154
155 fn stream_manager(&self) -> &StreamManager;
157
158 fn prepare_pipeline_data(&self, data: PipelineData) -> Result<PipelineData, ShellError>;
161
162 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
166 self.stream_manager().handle_message(message)
167 }
168
169 fn read_pipeline_data(
174 &self,
175 header: PipelineDataHeader,
176 signals: &Signals,
177 ) -> Result<PipelineData, ShellError> {
178 self.prepare_pipeline_data(match header {
179 PipelineDataHeader::Empty => PipelineData::empty(),
180 PipelineDataHeader::Value(value, metadata) => PipelineData::value(value, metadata),
181 PipelineDataHeader::ListStream(info) => {
182 let handle = self.stream_manager().get_handle();
183 let reader = handle.read_stream(info.id, self.get_interface())?;
184 let ls = ListStream::new(reader, info.span, signals.clone());
185 PipelineData::list_stream(ls, info.metadata)
186 }
187 PipelineDataHeader::ByteStream(info) => {
188 let handle = self.stream_manager().get_handle();
189 let reader = handle.read_stream(info.id, self.get_interface())?;
190 let bs =
191 ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
192 PipelineData::byte_stream(bs, info.metadata)
193 }
194 })
195 }
196}
197
198pub trait Interface: Clone + Send {
204 type Output: From<StreamMessage>;
206
207 type DataContext;
209
210 fn write(&self, output: Self::Output) -> Result<(), ShellError>;
212
213 fn flush(&self) -> Result<(), ShellError>;
215
216 fn stream_id_sequence(&self) -> &Sequence;
218
219 fn stream_manager_handle(&self) -> &StreamManagerHandle;
221
222 fn prepare_pipeline_data(
225 &self,
226 data: PipelineData,
227 context: &Self::DataContext,
228 ) -> Result<PipelineData, ShellError>;
229
230 fn init_write_pipeline_data(
239 &self,
240 data: PipelineData,
241 context: &Self::DataContext,
242 ) -> Result<(PipelineDataHeader, PipelineDataWriter<Self>), ShellError> {
243 let new_stream = |high_pressure_mark: i32| {
245 let id = self.stream_id_sequence().next()?;
247 let writer =
249 self.stream_manager_handle()
250 .write_stream(id, self.clone(), high_pressure_mark)?;
251 Ok::<_, ShellError>((id, writer))
252 };
253 match self.prepare_pipeline_data(data, context)? {
254 PipelineData::Value(value, metadata) => Ok((
255 PipelineDataHeader::Value(value, metadata),
256 PipelineDataWriter::None,
257 )),
258 PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
259 PipelineData::ListStream(stream, metadata) => {
260 let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
261 Ok((
262 PipelineDataHeader::ListStream(ListStreamInfo {
263 id,
264 span: stream.span(),
265 metadata,
266 }),
267 PipelineDataWriter::ListStream(writer, stream),
268 ))
269 }
270 PipelineData::ByteStream(stream, metadata) => {
271 let span = stream.span();
272 let type_ = stream.type_();
273 if let Some(reader) = stream.reader() {
274 let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
275 let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
276 id,
277 span,
278 type_,
279 metadata,
280 });
281 Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
282 } else {
283 Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))
284 }
285 }
286 }
287 }
288}
289
290impl<T> WriteStreamMessage for T
291where
292 T: Interface,
293{
294 fn write_stream_message(&mut self, msg: StreamMessage) -> Result<(), ShellError> {
295 self.write(msg.into())
296 }
297
298 fn flush(&mut self) -> Result<(), ShellError> {
299 <Self as Interface>::flush(self)
300 }
301}
302
303#[derive(Default)]
306#[must_use]
307pub enum PipelineDataWriter<W: WriteStreamMessage> {
308 #[default]
309 None,
310 ListStream(StreamWriter<W>, ListStream),
311 ByteStream(StreamWriter<W>, Reader),
312}
313
314impl<W> PipelineDataWriter<W>
315where
316 W: WriteStreamMessage + Send + 'static,
317{
318 pub fn write(self) -> Result<(), ShellError> {
320 match self {
321 PipelineDataWriter::None => Ok(()),
323 PipelineDataWriter::ListStream(mut writer, stream) => {
325 writer.write_all(stream)?;
326 Ok(())
327 }
328 PipelineDataWriter::ByteStream(mut writer, mut reader) => {
330 let span = reader.span();
331 let buf = &mut [0; 8192];
332 writer.write_all(std::iter::from_fn(move || match reader.read(buf) {
333 Ok(0) => None,
334 Ok(len) => Some(Ok(buf[..len].to_vec())),
335 Err(err) => Some(Err(ShellError::from(IoError::new(err, span, None)))),
336 }))?;
337 Ok(())
338 }
339 }
340 }
341
342 pub fn write_background(
345 self,
346 ) -> Result<Option<thread::JoinHandle<Result<(), ShellError>>>, ShellError> {
347 match self {
348 PipelineDataWriter::None => Ok(None),
349 _ => Ok(Some(
350 thread::Builder::new()
351 .name("plugin stream background writer".into())
352 .spawn(move || {
353 let result = self.write();
354 if let Err(ref err) = result {
355 log::warn!("Error while writing pipeline in background: {err}");
358 }
359 result
360 })
361 .map_err(|err| {
362 IoError::new_internal(
363 err,
364 "Could not spawn plugin stream background writer",
365 )
366 })?,
367 )),
368 }
369 }
370}