layered_io/
layered_duplexer.rs

1use crate::{
2    default_read, default_read_exact_using_status, default_read_to_end, default_read_to_string,
3    default_read_vectored, Bufferable, ReadLayered, Status, WriteLayered,
4};
5use duplex::Duplex;
6#[cfg(windows)]
7use io_extras::os::windows::{
8    AsRawReadWriteHandleOrSocket, AsReadWriteHandleOrSocket, BorrowedHandleOrSocket,
9    RawHandleOrSocket,
10};
11use std::fmt::{self, Arguments};
12use std::io::{self, IoSlice, IoSliceMut, Read, Write};
13#[cfg(feature = "terminal-io")]
14use terminal_io::DuplexTerminal;
15#[cfg(not(windows))]
16use {
17    io_extras::os::rustix::{AsRawReadWriteFd, AsReadWriteFd, RawFd},
18    std::os::fd::BorrowedFd,
19};
20
21/// Adapts an `Read` + `Write` to implement [`HalfDuplexLayered`].
22///
23/// [`HalfDuplexLayered`]: crate::HalfDuplexLayered
24pub struct LayeredDuplexer<Inner> {
25    inner: Option<Inner>,
26    eos_as_push: bool,
27    line_by_line: bool,
28}
29
30#[cfg(feature = "terminal-io")]
31impl<Inner: DuplexTerminal> LayeredDuplexer<Inner> {
32    /// Construct a new `LayeredDuplexer` which wraps `inner`, which implements
33    /// `DuplexTerminal`, and automatically sets the `line_by_line` setting if
34    /// appropriate.
35    pub fn maybe_terminal(inner: Inner) -> Self {
36        let line_by_line = inner.is_line_by_line();
37
38        if line_by_line {
39            Self::line_by_line(inner)
40        } else {
41            Self::new(inner)
42        }
43    }
44}
45
46impl<Inner: Read + Write> LayeredDuplexer<Inner> {
47    /// Construct a new `LayeredDuplexer` which wraps `inner` with default
48    /// settings.
49    pub fn new(inner: Inner) -> Self {
50        Self {
51            inner: Some(inner),
52            eos_as_push: false,
53            line_by_line: false,
54        }
55    }
56
57    /// Construct a new `LayeredDuplexer` which wraps `inner`. When `inner`
58    /// reports end of stream (by returning 0), report a push but keep the
59    /// stream open and continue to read data on it.
60    ///
61    /// For example, when reading a file, when the reader reaches the end of
62    /// the file it will report it, but consumers may wish to continue reading
63    /// in case additional data is appended to the file.
64    pub fn with_eos_as_push(inner: Inner) -> Self {
65        Self {
66            inner: Some(inner),
67            eos_as_push: true,
68            line_by_line: false,
69        }
70    }
71
72    /// Construct a new `LayeredDuplexer` which wraps an `inner` which reads
73    /// its input line-by-line, such as stdin on a terminal.
74    pub fn line_by_line(inner: Inner) -> Self {
75        Self {
76            inner: Some(inner),
77            eos_as_push: false,
78            line_by_line: true,
79        }
80    }
81
82    /// Close this `LayeredDuplexer` and return the inner stream.
83    pub fn close_into_inner(mut self) -> io::Result<Inner> {
84        match &mut self.inner {
85            Some(_) => {
86                let mut inner = self.inner.take().unwrap();
87                inner.flush()?;
88                Ok(inner)
89            }
90            None => Err(stream_already_ended()),
91        }
92    }
93
94    /// Consume this `LayeredDuplexer` and return the inner stream.
95    pub fn abandon_into_inner(mut self) -> Option<Inner> {
96        self.inner.take()
97    }
98}
99
100impl<Inner: Read + Write> ReadLayered for LayeredDuplexer<Inner> {
101    #[inline]
102    fn read_with_status(&mut self, buf: &mut [u8]) -> io::Result<(usize, Status)> {
103        if self.inner.is_none() {
104            return Ok((0, Status::End));
105        }
106        match self.inner.as_mut().unwrap().read(buf) {
107            Ok(0) if !buf.is_empty() => {
108                if self.eos_as_push {
109                    Ok((0, Status::push()))
110                } else {
111                    drop(self.inner.take().unwrap());
112                    Ok((0, Status::End))
113                }
114            }
115            Ok(size) => {
116                if self.line_by_line && buf[size - 1] == b'\n' {
117                    Ok((size, Status::push()))
118                } else {
119                    Ok((size, Status::active()))
120                }
121            }
122            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
123            Err(e) => {
124                self.abandon();
125                Err(e)
126            }
127        }
128    }
129
130    #[inline]
131    fn read_vectored_with_status(
132        &mut self,
133        bufs: &mut [IoSliceMut<'_>],
134    ) -> io::Result<(usize, Status)> {
135        if self.inner.is_none() {
136            return Ok((0, Status::End));
137        }
138        match self.inner.as_mut().unwrap().read_vectored(bufs) {
139            Ok(0) if !bufs.iter().all(|b| b.is_empty()) => {
140                if self.eos_as_push {
141                    Ok((0, Status::push()))
142                } else {
143                    drop(self.inner.take().unwrap());
144                    Ok((0, Status::End))
145                }
146            }
147            Ok(size) => {
148                if self.line_by_line {
149                    let mut i = size;
150                    let mut saw_line = false;
151                    for buf in bufs.iter() {
152                        if i < buf.len() {
153                            saw_line = buf[i - 1] == b'\n';
154                            break;
155                        }
156                        i -= bufs.len();
157                    }
158                    if saw_line {
159                        return Ok((size, Status::push()));
160                    }
161                }
162
163                Ok((size, Status::active()))
164            }
165            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
166            Err(e) => {
167                self.abandon();
168                Err(e)
169            }
170        }
171    }
172}
173
174impl<Inner: Read + Write> Read for LayeredDuplexer<Inner> {
175    #[inline]
176    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177        default_read(self, buf).map_err(|e| {
178            drop(self.inner.take().unwrap());
179            e
180        })
181    }
182
183    #[inline]
184    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
185        default_read_vectored(self, bufs).map_err(|e| {
186            drop(self.inner.take().unwrap());
187            e
188        })
189    }
190
191    #[cfg(can_vector)]
192    #[inline]
193    fn is_read_vectored(&self) -> bool {
194        match &self.inner {
195            Some(inner) => inner.is_read_vectored(),
196            None => false,
197        }
198    }
199
200    #[inline]
201    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
202        default_read_to_end(self, buf).map_err(|e| {
203            drop(self.inner.take().unwrap());
204            e
205        })
206    }
207
208    #[inline]
209    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
210        default_read_to_string(self, buf).map_err(|e| {
211            drop(self.inner.take().unwrap());
212            e
213        })
214    }
215
216    #[inline]
217    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
218        default_read_exact_using_status(self, buf)
219            .map(|_status| ())
220            .map_err(|e| {
221                drop(self.inner.take().unwrap());
222                e
223            })
224    }
225}
226
227impl<Inner: Read + Write> WriteLayered for LayeredDuplexer<Inner> {
228    #[inline]
229    fn close(&mut self) -> io::Result<()> {
230        match &mut self.inner {
231            Some(_) => self.inner.take().unwrap().flush(),
232            None => Err(stream_already_ended()),
233        }
234    }
235}
236
237impl<Inner: Read + Write> Write for LayeredDuplexer<Inner> {
238    #[inline]
239    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
240        match &mut self.inner {
241            Some(inner) => inner.write(buf).map_err(|e| {
242                drop(self.inner.take().unwrap());
243                e
244            }),
245            None => Err(stream_already_ended()),
246        }
247    }
248
249    #[inline]
250    fn flush(&mut self) -> io::Result<()> {
251        match &mut self.inner {
252            Some(inner) => inner.flush().map_err(|e| {
253                drop(self.inner.take().unwrap());
254                e
255            }),
256            None => Err(stream_already_ended()),
257        }
258    }
259
260    #[inline]
261    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
262        match &mut self.inner {
263            Some(inner) => inner.write_vectored(bufs).map_err(|e| {
264                drop(self.inner.take().unwrap());
265                e
266            }),
267            None => Err(stream_already_ended()),
268        }
269    }
270
271    #[cfg(can_vector)]
272    #[inline]
273    fn is_write_vectored(&self) -> bool {
274        match &self.inner {
275            Some(inner) => inner.is_write_vectored(),
276            None => false,
277        }
278    }
279
280    #[inline]
281    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
282        match &mut self.inner {
283            Some(inner) => inner.write_all(buf).map_err(|e| {
284                drop(self.inner.take().unwrap());
285                e
286            }),
287            None => Err(stream_already_ended()),
288        }
289    }
290
291    #[cfg(write_all_vectored)]
292    #[inline]
293    fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> {
294        match &mut self.inner {
295            Some(inner) => inner.write_all_vectored(bufs).map_err(|e| {
296                drop(self.inner.take().unwrap());
297                e
298            }),
299            None => Err(stream_already_ended()),
300        }
301    }
302
303    #[inline]
304    fn write_fmt(&mut self, fmt: Arguments<'_>) -> io::Result<()> {
305        match &mut self.inner {
306            Some(inner) => inner.write_fmt(fmt).map_err(|e| {
307                drop(self.inner.take().unwrap());
308                e
309            }),
310            None => Err(stream_already_ended()),
311        }
312    }
313}
314
315impl<Inner> Bufferable for LayeredDuplexer<Inner> {
316    #[inline]
317    fn abandon(&mut self) {
318        self.inner = None;
319    }
320}
321
322impl<Inner: Read + Write + Duplex> Duplex for LayeredDuplexer<Inner> {}
323
324#[cfg(feature = "terminal-io")]
325impl<RW: terminal_io::DuplexTerminal> terminal_io::Terminal for LayeredDuplexer<RW> {}
326
327#[cfg(feature = "terminal-io")]
328impl<RW: terminal_io::DuplexTerminal> terminal_io::ReadTerminal for LayeredDuplexer<RW> {
329    #[inline]
330    fn is_line_by_line(&self) -> bool {
331        self.inner
332            .as_ref()
333            .map(|c| c.is_line_by_line())
334            .unwrap_or(false)
335    }
336
337    #[inline]
338    fn is_input_terminal(&self) -> bool {
339        self.inner
340            .as_ref()
341            .map(|c| c.is_input_terminal())
342            .unwrap_or(false)
343    }
344}
345
346#[cfg(feature = "terminal-io")]
347impl<RW: terminal_io::DuplexTerminal> terminal_io::WriteTerminal for LayeredDuplexer<RW> {
348    #[inline]
349    fn color_support(&self) -> terminal_io::TerminalColorSupport {
350        self.inner.as_ref().unwrap().color_support()
351    }
352
353    #[inline]
354    fn color_preference(&self) -> bool {
355        self.inner.as_ref().unwrap().color_preference()
356    }
357
358    #[inline]
359    fn is_output_terminal(&self) -> bool {
360        self.inner
361            .as_ref()
362            .map(|c| c.is_output_terminal())
363            .unwrap_or(false)
364    }
365}
366
367#[cfg(feature = "terminal-io")]
368impl<RW: terminal_io::DuplexTerminal> terminal_io::DuplexTerminal for LayeredDuplexer<RW> {}
369
370#[cfg(not(windows))]
371impl<Inner: Duplex + AsRawReadWriteFd> AsRawReadWriteFd for LayeredDuplexer<Inner> {
372    #[inline]
373    fn as_raw_read_fd(&self) -> RawFd {
374        match &self.inner {
375            Some(inner) => inner.as_raw_read_fd(),
376            None => panic!("as_raw_read_fd() called on closed LayeredDuplexer"),
377        }
378    }
379
380    #[inline]
381    fn as_raw_write_fd(&self) -> RawFd {
382        match &self.inner {
383            Some(inner) => inner.as_raw_write_fd(),
384            None => panic!("as_raw_write_fd() called on closed LayeredDuplexer"),
385        }
386    }
387}
388
389#[cfg(not(windows))]
390impl<Inner: Duplex + AsReadWriteFd> AsReadWriteFd for LayeredDuplexer<Inner> {
391    #[inline]
392    fn as_read_fd(&self) -> BorrowedFd<'_> {
393        match &self.inner {
394            Some(inner) => inner.as_read_fd(),
395            None => panic!("as_read_fd() called on closed LayeredDuplexer"),
396        }
397    }
398
399    #[inline]
400    fn as_write_fd(&self) -> BorrowedFd<'_> {
401        match &self.inner {
402            Some(inner) => inner.as_write_fd(),
403            None => panic!("as_write_fd() called on closed LayeredDuplexer"),
404        }
405    }
406}
407
408#[cfg(windows)]
409impl<Inner: Duplex + AsRawReadWriteHandleOrSocket> AsRawReadWriteHandleOrSocket
410    for LayeredDuplexer<Inner>
411{
412    #[inline]
413    fn as_raw_read_handle_or_socket(&self) -> RawHandleOrSocket {
414        match &self.inner {
415            Some(inner) => inner.as_raw_read_handle_or_socket(),
416            None => panic!("as_raw_read_handle_or_socket() called on closed LayeredDuplexer"),
417        }
418    }
419
420    #[inline]
421    fn as_raw_write_handle_or_socket(&self) -> RawHandleOrSocket {
422        match &self.inner {
423            Some(inner) => inner.as_raw_write_handle_or_socket(),
424            None => panic!("as_raw_write_handle_or_socket() called on closed LayeredDuplexer"),
425        }
426    }
427}
428
429#[cfg(windows)]
430impl<Inner: Duplex + AsReadWriteHandleOrSocket> AsReadWriteHandleOrSocket
431    for LayeredDuplexer<Inner>
432{
433    #[inline]
434    fn as_read_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
435        match &self.inner {
436            Some(inner) => inner.as_read_handle_or_socket(),
437            None => panic!("as_read_handle_or_socket() called on closed LayeredDuplexer"),
438        }
439    }
440
441    #[inline]
442    fn as_write_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
443        match &self.inner {
444            Some(inner) => inner.as_write_handle_or_socket(),
445            None => panic!("as_write_handle_or_socket() called on closed LayeredDuplexer"),
446        }
447    }
448}
449
450impl<Inner: fmt::Debug> fmt::Debug for LayeredDuplexer<Inner> {
451    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452        let mut b = f.debug_struct("LayeredDuplexer");
453        b.field("inner", &self.inner);
454        b.finish()
455    }
456}
457
458fn stream_already_ended() -> io::Error {
459    io::Error::new(io::ErrorKind::BrokenPipe, "stream has already ended")
460}
461
462impl<Inner> Drop for LayeredDuplexer<Inner> {
463    fn drop(&mut self) {
464        assert!(self.inner.is_none(), "stream was not closed or abandoned");
465    }
466}
467
468#[test]
469fn test_layered_duplexion() {
470    let mut input = io::Cursor::new(b"hello world".to_vec());
471    let mut reader = LayeredDuplexer::new(&mut input);
472    let mut s = String::new();
473    reader.read_to_string(&mut s).unwrap();
474    assert_eq!(s, "hello world");
475}