layered_io/
layered_reader.rs

1use crate::{
2    default_read, default_read_exact_using_status, default_read_to_end, default_read_to_string,
3    default_read_vectored, Bufferable, ReadLayered, Status,
4};
5#[cfg(windows)]
6use io_extras::os::windows::{
7    AsHandleOrSocket, AsRawHandleOrSocket, BorrowedHandleOrSocket, RawHandleOrSocket,
8};
9use std::fmt;
10use std::io::{self, IoSliceMut, Read};
11#[cfg(feature = "terminal-io")]
12use terminal_io::ReadTerminal;
13#[cfg(not(windows))]
14use {
15    io_extras::os::rustix::{AsRawFd, RawFd},
16    std::os::fd::{AsFd, BorrowedFd},
17};
18
19/// Adapts an `Read` to implement [`ReadLayered`].
20pub struct LayeredReader<Inner> {
21    inner: Option<Inner>,
22    eos_as_push: bool,
23    line_by_line: bool,
24}
25
26#[cfg(feature = "terminal-io")]
27impl<Inner: ReadTerminal> LayeredReader<Inner> {
28    /// Construct a new `LayeredReader` which wraps `inner`, which implements
29    /// `ReadTerminal`, and automatically sets the `line_by_line` setting if
30    /// appropriate.
31    pub fn maybe_terminal(inner: Inner) -> Self {
32        let line_by_line = inner.is_line_by_line();
33
34        if line_by_line {
35            Self::line_by_line(inner)
36        } else {
37            Self::new(inner)
38        }
39    }
40}
41
42impl<Inner: Read> LayeredReader<Inner> {
43    /// Construct a new `LayeredReader` which wraps `inner` with default
44    /// settings.
45    pub fn new(inner: Inner) -> Self {
46        Self {
47            inner: Some(inner),
48            eos_as_push: false,
49            line_by_line: false,
50        }
51    }
52
53    /// Construct a new `LayeredReader` which wraps `inner`. When `inner`
54    /// reports end of stream (by returning 0), report a push but keep the
55    /// stream open and continue to read data on it.
56    ///
57    /// For example, when reading a file, when the reader reaches the end of
58    /// the file it will report it, but consumers may wish to continue reading
59    /// in case additional data is appended to the file.
60    pub fn with_eos_as_push(inner: Inner) -> Self {
61        Self {
62            inner: Some(inner),
63            eos_as_push: true,
64            line_by_line: false,
65        }
66    }
67
68    /// Construct a new `LayeredReader` which wraps an `inner` which reads its
69    /// input line-by-line, such as stdin on a terminal.
70    pub fn line_by_line(inner: Inner) -> Self {
71        Self {
72            inner: Some(inner),
73            eos_as_push: false,
74            line_by_line: true,
75        }
76    }
77
78    /// Consume this `LayeredReader` and return the inner stream.
79    pub fn abandon_into_inner(self) -> Option<Inner> {
80        self.inner
81    }
82}
83
84impl<Inner: Read> ReadLayered for LayeredReader<Inner> {
85    #[inline]
86    fn read_with_status(&mut self, buf: &mut [u8]) -> io::Result<(usize, Status)> {
87        if self.inner.is_none() {
88            return Ok((0, Status::End));
89        }
90        match self.inner.as_mut().unwrap().read(buf) {
91            Ok(0) if !buf.is_empty() => {
92                if self.eos_as_push {
93                    Ok((0, Status::push()))
94                } else {
95                    drop(self.inner.take().unwrap());
96                    Ok((0, Status::End))
97                }
98            }
99            Ok(size) => {
100                if self.line_by_line && buf[size - 1] == b'\n' {
101                    Ok((size, Status::push()))
102                } else {
103                    Ok((size, Status::active()))
104                }
105            }
106            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
107            Err(e) => {
108                self.abandon();
109                Err(e)
110            }
111        }
112    }
113
114    #[inline]
115    fn read_vectored_with_status(
116        &mut self,
117        bufs: &mut [IoSliceMut<'_>],
118    ) -> io::Result<(usize, Status)> {
119        if self.inner.is_none() {
120            return Ok((0, Status::End));
121        }
122        match self.inner.as_mut().unwrap().read_vectored(bufs) {
123            Ok(0) if !bufs.iter().all(|b| b.is_empty()) => {
124                if self.eos_as_push {
125                    Ok((0, Status::push()))
126                } else {
127                    drop(self.inner.take().unwrap());
128                    Ok((0, Status::End))
129                }
130            }
131            Ok(size) => {
132                if self.line_by_line {
133                    let mut i = size;
134                    let mut saw_line = false;
135                    for buf in bufs.iter() {
136                        if i < buf.len() {
137                            saw_line = buf[i - 1] == b'\n';
138                            break;
139                        }
140                        i -= bufs.len();
141                    }
142                    if saw_line {
143                        return Ok((size, Status::push()));
144                    }
145                }
146
147                Ok((size, Status::active()))
148            }
149            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
150            Err(e) => {
151                self.abandon();
152                Err(e)
153            }
154        }
155    }
156}
157
158impl<Inner> Bufferable for LayeredReader<Inner> {
159    #[inline]
160    fn abandon(&mut self) {
161        self.inner = None;
162    }
163}
164
165impl<Inner: Read> Read for LayeredReader<Inner> {
166    #[inline]
167    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
168        default_read(self, buf).map_err(|e| {
169            drop(self.inner.take().unwrap());
170            e
171        })
172    }
173
174    #[inline]
175    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
176        default_read_vectored(self, bufs).map_err(|e| {
177            drop(self.inner.take().unwrap());
178            e
179        })
180    }
181
182    #[cfg(can_vector)]
183    #[inline]
184    fn is_read_vectored(&self) -> bool {
185        match &self.inner {
186            Some(inner) => inner.is_read_vectored(),
187            None => false,
188        }
189    }
190
191    #[inline]
192    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
193        default_read_to_end(self, buf).map_err(|e| {
194            drop(self.inner.take().unwrap());
195            e
196        })
197    }
198
199    #[inline]
200    fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
201        default_read_to_string(self, buf).map_err(|e| {
202            drop(self.inner.take().unwrap());
203            e
204        })
205    }
206
207    #[inline]
208    fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
209        default_read_exact_using_status(self, buf)
210            .map(|_status| ())
211            .map_err(|e| {
212                drop(self.inner.take().unwrap());
213                e
214            })
215    }
216}
217
218#[cfg(feature = "terminal-io")]
219impl<RW: Read + terminal_io::Terminal> terminal_io::Terminal for LayeredReader<RW> {}
220
221#[cfg(feature = "terminal-io")]
222impl<RW: terminal_io::ReadTerminal> terminal_io::ReadTerminal for LayeredReader<RW> {
223    #[inline]
224    fn is_line_by_line(&self) -> bool {
225        self.inner
226            .as_ref()
227            .map(|c| c.is_line_by_line())
228            .unwrap_or(false)
229    }
230
231    #[inline]
232    fn is_input_terminal(&self) -> bool {
233        self.inner
234            .as_ref()
235            .map(|c| c.is_input_terminal())
236            .unwrap_or(false)
237    }
238}
239
240#[cfg(not(windows))]
241impl<Inner: Read + AsRawFd> AsRawFd for LayeredReader<Inner> {
242    #[inline]
243    fn as_raw_fd(&self) -> RawFd {
244        match &self.inner {
245            Some(inner) => inner.as_raw_fd(),
246            None => panic!("as_raw_fd() called on closed LayeredReader"),
247        }
248    }
249}
250
251#[cfg(not(windows))]
252impl<Inner: Read + AsFd> AsFd for LayeredReader<Inner> {
253    #[inline]
254    fn as_fd(&self) -> BorrowedFd<'_> {
255        match &self.inner {
256            Some(inner) => inner.as_fd(),
257            None => panic!("as_fd() called on closed LayeredReader"),
258        }
259    }
260}
261
262#[cfg(windows)]
263impl<Inner: Read + AsRawHandleOrSocket> AsRawHandleOrSocket for LayeredReader<Inner> {
264    #[inline]
265    fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
266        match &self.inner {
267            Some(inner) => inner.as_raw_handle_or_socket(),
268            None => panic!("as_raw_handle_or_socket() called on closed LayeredReader"),
269        }
270    }
271}
272
273#[cfg(windows)]
274impl<Inner: Read + AsHandleOrSocket> AsHandleOrSocket for LayeredReader<Inner> {
275    #[inline]
276    fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
277        match &self.inner {
278            Some(inner) => inner.as_handle_or_socket(),
279            None => panic!("as_handle_or_socket() called on closed LayeredReader"),
280        }
281    }
282}
283
284impl<Inner: fmt::Debug> fmt::Debug for LayeredReader<Inner> {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        let mut b = f.debug_struct("LayeredReader");
287        b.field("inner", &self.inner);
288        b.finish()
289    }
290}
291
292#[test]
293fn test_layered_reader() {
294    let mut input = io::Cursor::new(b"hello world");
295    let mut reader = LayeredReader::new(&mut input);
296    let mut s = String::new();
297    reader.read_to_string(&mut s).unwrap();
298    assert_eq!(s, "hello world");
299}