nu_plugin_core/interface/
mod.rs1use nu_plugin_protocol::{ByteStreamInfo, ListStreamInfo, PipelineDataHeader, StreamMessage};
4use nu_protocol::{
5 ByteStream, ListStream, PipelineData, Reader, ShellError, Signals, engine::Sequence,
6 shell_error::io::IoError,
7};
8use std::{
9 io::{Read, Write},
10 sync::Mutex,
11 thread,
12};
13
14pub mod stream;
15
16use crate::Encoder;
17
18use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage};
19
20pub mod test_util;
21
22#[cfg(test)]
23mod tests;
24
25const LIST_STREAM_HIGH_PRESSURE: i32 = 100;
28
29const RAW_STREAM_HIGH_PRESSURE: i32 = 50;
32
33pub trait PluginRead<T> {
35 fn read(&mut self) -> Result<Option<T>, ShellError>;
37}
38
39impl<R, E, T> PluginRead<T> for (R, E)
40where
41 R: std::io::BufRead,
42 E: Encoder<T>,
43{
44 fn read(&mut self) -> Result<Option<T>, ShellError> {
45 self.1.decode(&mut self.0)
46 }
47}
48
49impl<R, T> PluginRead<T> for &mut R
50where
51 R: PluginRead<T>,
52{
53 fn read(&mut self) -> Result<Option<T>, ShellError> {
54 (**self).read()
55 }
56}
57
58pub trait PluginWrite<T>: Send + Sync {
62 fn write(&self, data: &T) -> Result<(), ShellError>;
63
64 fn flush(&self) -> Result<(), ShellError>;
66
67 fn is_stdout(&self) -> bool {
69 false
70 }
71}
72
73impl<E, T> PluginWrite<T> for (std::io::Stdout, E)
74where
75 E: Encoder<T>,
76{
77 fn write(&self, data: &T) -> Result<(), ShellError> {
78 let mut lock = self.0.lock();
79 self.1.encode(data, &mut lock)
80 }
81
82 fn flush(&self) -> Result<(), ShellError> {
83 self.0.lock().flush().map_err(|err| {
84 ShellError::Io(IoError::new_internal(
85 err,
86 "PluginWrite could not flush",
87 nu_protocol::location!(),
88 ))
89 })
90 }
91
92 fn is_stdout(&self) -> bool {
93 true
94 }
95}
96
97impl<W, E, T> PluginWrite<T> for (Mutex<W>, E)
98where
99 W: std::io::Write + Send,
100 E: Encoder<T>,
101{
102 fn write(&self, data: &T) -> Result<(), ShellError> {
103 let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
104 msg: "writer mutex poisoned".into(),
105 })?;
106 self.1.encode(data, &mut *lock)
107 }
108
109 fn flush(&self) -> Result<(), ShellError> {
110 let mut lock = self.0.lock().map_err(|_| ShellError::NushellFailed {
111 msg: "writer mutex poisoned".into(),
112 })?;
113 lock.flush().map_err(|err| {
114 ShellError::Io(IoError::new_internal(
115 err,
116 "PluginWrite could not flush",
117 nu_protocol::location!(),
118 ))
119 })
120 }
121}
122
123impl<W, T> PluginWrite<T> for &W
124where
125 W: PluginWrite<T>,
126{
127 fn write(&self, data: &T) -> Result<(), ShellError> {
128 (**self).write(data)
129 }
130
131 fn flush(&self) -> Result<(), ShellError> {
132 (**self).flush()
133 }
134
135 fn is_stdout(&self) -> bool {
136 (**self).is_stdout()
137 }
138}
139
140pub trait InterfaceManager {
148 type Interface: Interface + 'static;
150
151 type Input;
153
154 fn get_interface(&self) -> Self::Interface;
156
157 fn consume(&mut self, input: Self::Input) -> Result<(), ShellError>;
162
163 fn stream_manager(&self) -> &StreamManager;
165
166 fn prepare_pipeline_data(&self, data: PipelineData) -> Result<PipelineData, ShellError>;
169
170 fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
174 self.stream_manager().handle_message(message)
175 }
176
177 fn read_pipeline_data(
182 &self,
183 header: PipelineDataHeader,
184 signals: &Signals,
185 ) -> Result<PipelineData, ShellError> {
186 self.prepare_pipeline_data(match header {
187 PipelineDataHeader::Empty => PipelineData::empty(),
188 PipelineDataHeader::Value(value, metadata) => PipelineData::value(value, metadata),
189 PipelineDataHeader::ListStream(info) => {
190 let handle = self.stream_manager().get_handle();
191 let reader = handle.read_stream(info.id, self.get_interface())?;
192 let ls = ListStream::new(reader, info.span, signals.clone());
193 PipelineData::list_stream(ls, info.metadata)
194 }
195 PipelineDataHeader::ByteStream(info) => {
196 let handle = self.stream_manager().get_handle();
197 let reader = handle.read_stream(info.id, self.get_interface())?;
198 let bs =
199 ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
200 PipelineData::byte_stream(bs, info.metadata)
201 }
202 })
203 }
204}
205
206pub trait Interface: Clone + Send {
212 type Output: From<StreamMessage>;
214
215 type DataContext;
217
218 fn write(&self, output: Self::Output) -> Result<(), ShellError>;
220
221 fn flush(&self) -> Result<(), ShellError>;
223
224 fn stream_id_sequence(&self) -> &Sequence;
226
227 fn stream_manager_handle(&self) -> &StreamManagerHandle;
229
230 fn prepare_pipeline_data(
233 &self,
234 data: PipelineData,
235 context: &Self::DataContext,
236 ) -> Result<PipelineData, ShellError>;
237
238 fn init_write_pipeline_data(
247 &self,
248 data: PipelineData,
249 context: &Self::DataContext,
250 ) -> Result<(PipelineDataHeader, PipelineDataWriter<Self>), ShellError> {
251 let new_stream = |high_pressure_mark: i32| {
253 let id = self.stream_id_sequence().next()?;
255 let writer =
257 self.stream_manager_handle()
258 .write_stream(id, self.clone(), high_pressure_mark)?;
259 Ok::<_, ShellError>((id, writer))
260 };
261 match self.prepare_pipeline_data(data, context)? {
262 PipelineData::Value(value, metadata) => Ok((
263 PipelineDataHeader::Value(value, metadata),
264 PipelineDataWriter::None,
265 )),
266 PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
267 PipelineData::ListStream(stream, metadata) => {
268 let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
269 Ok((
270 PipelineDataHeader::ListStream(ListStreamInfo {
271 id,
272 span: stream.span(),
273 metadata,
274 }),
275 PipelineDataWriter::ListStream(writer, stream),
276 ))
277 }
278 PipelineData::ByteStream(stream, metadata) => {
279 let span = stream.span();
280 let type_ = stream.type_();
281 if let Some(reader) = stream.reader() {
282 let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
283 let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
284 id,
285 span,
286 type_,
287 metadata,
288 });
289 Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
290 } else {
291 Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))
292 }
293 }
294 }
295 }
296}
297
298impl<T> WriteStreamMessage for T
299where
300 T: Interface,
301{
302 fn write_stream_message(&mut self, msg: StreamMessage) -> Result<(), ShellError> {
303 self.write(msg.into())
304 }
305
306 fn flush(&mut self) -> Result<(), ShellError> {
307 <Self as Interface>::flush(self)
308 }
309}
310
311#[derive(Default)]
314#[must_use]
315pub enum PipelineDataWriter<W: WriteStreamMessage> {
316 #[default]
317 None,
318 ListStream(StreamWriter<W>, ListStream),
319 ByteStream(StreamWriter<W>, Reader),
320}
321
322impl<W> PipelineDataWriter<W>
323where
324 W: WriteStreamMessage + Send + 'static,
325{
326 pub fn write(self) -> Result<(), ShellError> {
328 match self {
329 PipelineDataWriter::None => Ok(()),
331 PipelineDataWriter::ListStream(mut writer, stream) => {
333 writer.write_all(stream)?;
334 Ok(())
335 }
336 PipelineDataWriter::ByteStream(mut writer, mut reader) => {
338 let span = reader.span();
339 let buf = &mut [0; 8192];
340 writer.write_all(std::iter::from_fn(move || match reader.read(buf) {
341 Ok(0) => None,
342 Ok(len) => Some(Ok(buf[..len].to_vec())),
343 Err(err) => Some(Err(ShellError::from(IoError::new(err, span, None)))),
344 }))?;
345 Ok(())
346 }
347 }
348 }
349
350 pub fn write_background(
353 self,
354 ) -> Result<Option<thread::JoinHandle<Result<(), ShellError>>>, ShellError> {
355 match self {
356 PipelineDataWriter::None => Ok(None),
357 _ => Ok(Some(
358 thread::Builder::new()
359 .name("plugin stream background writer".into())
360 .spawn(move || {
361 let result = self.write();
362 if let Err(ref err) = result {
363 log::warn!("Error while writing pipeline in background: {err}");
366 }
367 result
368 })
369 .map_err(|err| {
370 IoError::new_internal(
371 err,
372 "Could not spawn plugin stream background writer",
373 nu_protocol::location!(),
374 )
375 })?,
376 )),
377 }
378 }
379}