1use crate::{
2 default_read, default_read_exact_using_status, default_read_to_end, default_read_to_string,
3 default_read_vectored, Bufferable, ReadLayered, Status, WriteLayered,
4};
5use duplex::Duplex;
6#[cfg(windows)]
7use io_extras::os::windows::{
8 AsRawReadWriteHandleOrSocket, AsReadWriteHandleOrSocket, BorrowedHandleOrSocket,
9 RawHandleOrSocket,
10};
11use std::fmt::{self, Arguments};
12use std::io::{self, IoSlice, IoSliceMut, Read, Write};
13#[cfg(feature = "terminal-io")]
14use terminal_io::DuplexTerminal;
15#[cfg(not(windows))]
16use {
17 io_extras::os::rustix::{AsRawReadWriteFd, AsReadWriteFd, RawFd},
18 std::os::fd::BorrowedFd,
19};
20
21pub struct LayeredDuplexer<Inner> {
25 inner: Option<Inner>,
26 eos_as_push: bool,
27 line_by_line: bool,
28}
29
30#[cfg(feature = "terminal-io")]
31impl<Inner: DuplexTerminal> LayeredDuplexer<Inner> {
32 pub fn maybe_terminal(inner: Inner) -> Self {
36 let line_by_line = inner.is_line_by_line();
37
38 if line_by_line {
39 Self::line_by_line(inner)
40 } else {
41 Self::new(inner)
42 }
43 }
44}
45
46impl<Inner: Read + Write> LayeredDuplexer<Inner> {
47 pub fn new(inner: Inner) -> Self {
50 Self {
51 inner: Some(inner),
52 eos_as_push: false,
53 line_by_line: false,
54 }
55 }
56
57 pub fn with_eos_as_push(inner: Inner) -> Self {
65 Self {
66 inner: Some(inner),
67 eos_as_push: true,
68 line_by_line: false,
69 }
70 }
71
72 pub fn line_by_line(inner: Inner) -> Self {
75 Self {
76 inner: Some(inner),
77 eos_as_push: false,
78 line_by_line: true,
79 }
80 }
81
82 pub fn close_into_inner(mut self) -> io::Result<Inner> {
84 match &mut self.inner {
85 Some(_) => {
86 let mut inner = self.inner.take().unwrap();
87 inner.flush()?;
88 Ok(inner)
89 }
90 None => Err(stream_already_ended()),
91 }
92 }
93
94 pub fn abandon_into_inner(mut self) -> Option<Inner> {
96 self.inner.take()
97 }
98}
99
100impl<Inner: Read + Write> ReadLayered for LayeredDuplexer<Inner> {
101 #[inline]
102 fn read_with_status(&mut self, buf: &mut [u8]) -> io::Result<(usize, Status)> {
103 if self.inner.is_none() {
104 return Ok((0, Status::End));
105 }
106 match self.inner.as_mut().unwrap().read(buf) {
107 Ok(0) if !buf.is_empty() => {
108 if self.eos_as_push {
109 Ok((0, Status::push()))
110 } else {
111 drop(self.inner.take().unwrap());
112 Ok((0, Status::End))
113 }
114 }
115 Ok(size) => {
116 if self.line_by_line && buf[size - 1] == b'\n' {
117 Ok((size, Status::push()))
118 } else {
119 Ok((size, Status::active()))
120 }
121 }
122 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
123 Err(e) => {
124 self.abandon();
125 Err(e)
126 }
127 }
128 }
129
130 #[inline]
131 fn read_vectored_with_status(
132 &mut self,
133 bufs: &mut [IoSliceMut<'_>],
134 ) -> io::Result<(usize, Status)> {
135 if self.inner.is_none() {
136 return Ok((0, Status::End));
137 }
138 match self.inner.as_mut().unwrap().read_vectored(bufs) {
139 Ok(0) if !bufs.iter().all(|b| b.is_empty()) => {
140 if self.eos_as_push {
141 Ok((0, Status::push()))
142 } else {
143 drop(self.inner.take().unwrap());
144 Ok((0, Status::End))
145 }
146 }
147 Ok(size) => {
148 if self.line_by_line {
149 let mut i = size;
150 let mut saw_line = false;
151 for buf in bufs.iter() {
152 if i < buf.len() {
153 saw_line = buf[i - 1] == b'\n';
154 break;
155 }
156 i -= bufs.len();
157 }
158 if saw_line {
159 return Ok((size, Status::push()));
160 }
161 }
162
163 Ok((size, Status::active()))
164 }
165 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, Status::active())),
166 Err(e) => {
167 self.abandon();
168 Err(e)
169 }
170 }
171 }
172}
173
174impl<Inner: Read + Write> Read for LayeredDuplexer<Inner> {
175 #[inline]
176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177 default_read(self, buf).map_err(|e| {
178 drop(self.inner.take().unwrap());
179 e
180 })
181 }
182
183 #[inline]
184 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
185 default_read_vectored(self, bufs).map_err(|e| {
186 drop(self.inner.take().unwrap());
187 e
188 })
189 }
190
191 #[cfg(can_vector)]
192 #[inline]
193 fn is_read_vectored(&self) -> bool {
194 match &self.inner {
195 Some(inner) => inner.is_read_vectored(),
196 None => false,
197 }
198 }
199
200 #[inline]
201 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
202 default_read_to_end(self, buf).map_err(|e| {
203 drop(self.inner.take().unwrap());
204 e
205 })
206 }
207
208 #[inline]
209 fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
210 default_read_to_string(self, buf).map_err(|e| {
211 drop(self.inner.take().unwrap());
212 e
213 })
214 }
215
216 #[inline]
217 fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
218 default_read_exact_using_status(self, buf)
219 .map(|_status| ())
220 .map_err(|e| {
221 drop(self.inner.take().unwrap());
222 e
223 })
224 }
225}
226
227impl<Inner: Read + Write> WriteLayered for LayeredDuplexer<Inner> {
228 #[inline]
229 fn close(&mut self) -> io::Result<()> {
230 match &mut self.inner {
231 Some(_) => self.inner.take().unwrap().flush(),
232 None => Err(stream_already_ended()),
233 }
234 }
235}
236
237impl<Inner: Read + Write> Write for LayeredDuplexer<Inner> {
238 #[inline]
239 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
240 match &mut self.inner {
241 Some(inner) => inner.write(buf).map_err(|e| {
242 drop(self.inner.take().unwrap());
243 e
244 }),
245 None => Err(stream_already_ended()),
246 }
247 }
248
249 #[inline]
250 fn flush(&mut self) -> io::Result<()> {
251 match &mut self.inner {
252 Some(inner) => inner.flush().map_err(|e| {
253 drop(self.inner.take().unwrap());
254 e
255 }),
256 None => Err(stream_already_ended()),
257 }
258 }
259
260 #[inline]
261 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
262 match &mut self.inner {
263 Some(inner) => inner.write_vectored(bufs).map_err(|e| {
264 drop(self.inner.take().unwrap());
265 e
266 }),
267 None => Err(stream_already_ended()),
268 }
269 }
270
271 #[cfg(can_vector)]
272 #[inline]
273 fn is_write_vectored(&self) -> bool {
274 match &self.inner {
275 Some(inner) => inner.is_write_vectored(),
276 None => false,
277 }
278 }
279
280 #[inline]
281 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
282 match &mut self.inner {
283 Some(inner) => inner.write_all(buf).map_err(|e| {
284 drop(self.inner.take().unwrap());
285 e
286 }),
287 None => Err(stream_already_ended()),
288 }
289 }
290
291 #[cfg(write_all_vectored)]
292 #[inline]
293 fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> {
294 match &mut self.inner {
295 Some(inner) => inner.write_all_vectored(bufs).map_err(|e| {
296 drop(self.inner.take().unwrap());
297 e
298 }),
299 None => Err(stream_already_ended()),
300 }
301 }
302
303 #[inline]
304 fn write_fmt(&mut self, fmt: Arguments<'_>) -> io::Result<()> {
305 match &mut self.inner {
306 Some(inner) => inner.write_fmt(fmt).map_err(|e| {
307 drop(self.inner.take().unwrap());
308 e
309 }),
310 None => Err(stream_already_ended()),
311 }
312 }
313}
314
315impl<Inner> Bufferable for LayeredDuplexer<Inner> {
316 #[inline]
317 fn abandon(&mut self) {
318 self.inner = None;
319 }
320}
321
322impl<Inner: Read + Write + Duplex> Duplex for LayeredDuplexer<Inner> {}
323
324#[cfg(feature = "terminal-io")]
325impl<RW: terminal_io::DuplexTerminal> terminal_io::Terminal for LayeredDuplexer<RW> {}
326
327#[cfg(feature = "terminal-io")]
328impl<RW: terminal_io::DuplexTerminal> terminal_io::ReadTerminal for LayeredDuplexer<RW> {
329 #[inline]
330 fn is_line_by_line(&self) -> bool {
331 self.inner
332 .as_ref()
333 .map(|c| c.is_line_by_line())
334 .unwrap_or(false)
335 }
336
337 #[inline]
338 fn is_input_terminal(&self) -> bool {
339 self.inner
340 .as_ref()
341 .map(|c| c.is_input_terminal())
342 .unwrap_or(false)
343 }
344}
345
346#[cfg(feature = "terminal-io")]
347impl<RW: terminal_io::DuplexTerminal> terminal_io::WriteTerminal for LayeredDuplexer<RW> {
348 #[inline]
349 fn color_support(&self) -> terminal_io::TerminalColorSupport {
350 self.inner.as_ref().unwrap().color_support()
351 }
352
353 #[inline]
354 fn color_preference(&self) -> bool {
355 self.inner.as_ref().unwrap().color_preference()
356 }
357
358 #[inline]
359 fn is_output_terminal(&self) -> bool {
360 self.inner
361 .as_ref()
362 .map(|c| c.is_output_terminal())
363 .unwrap_or(false)
364 }
365}
366
367#[cfg(feature = "terminal-io")]
368impl<RW: terminal_io::DuplexTerminal> terminal_io::DuplexTerminal for LayeredDuplexer<RW> {}
369
370#[cfg(not(windows))]
371impl<Inner: Duplex + AsRawReadWriteFd> AsRawReadWriteFd for LayeredDuplexer<Inner> {
372 #[inline]
373 fn as_raw_read_fd(&self) -> RawFd {
374 match &self.inner {
375 Some(inner) => inner.as_raw_read_fd(),
376 None => panic!("as_raw_read_fd() called on closed LayeredDuplexer"),
377 }
378 }
379
380 #[inline]
381 fn as_raw_write_fd(&self) -> RawFd {
382 match &self.inner {
383 Some(inner) => inner.as_raw_write_fd(),
384 None => panic!("as_raw_write_fd() called on closed LayeredDuplexer"),
385 }
386 }
387}
388
389#[cfg(not(windows))]
390impl<Inner: Duplex + AsReadWriteFd> AsReadWriteFd for LayeredDuplexer<Inner> {
391 #[inline]
392 fn as_read_fd(&self) -> BorrowedFd<'_> {
393 match &self.inner {
394 Some(inner) => inner.as_read_fd(),
395 None => panic!("as_read_fd() called on closed LayeredDuplexer"),
396 }
397 }
398
399 #[inline]
400 fn as_write_fd(&self) -> BorrowedFd<'_> {
401 match &self.inner {
402 Some(inner) => inner.as_write_fd(),
403 None => panic!("as_write_fd() called on closed LayeredDuplexer"),
404 }
405 }
406}
407
408#[cfg(windows)]
409impl<Inner: Duplex + AsRawReadWriteHandleOrSocket> AsRawReadWriteHandleOrSocket
410 for LayeredDuplexer<Inner>
411{
412 #[inline]
413 fn as_raw_read_handle_or_socket(&self) -> RawHandleOrSocket {
414 match &self.inner {
415 Some(inner) => inner.as_raw_read_handle_or_socket(),
416 None => panic!("as_raw_read_handle_or_socket() called on closed LayeredDuplexer"),
417 }
418 }
419
420 #[inline]
421 fn as_raw_write_handle_or_socket(&self) -> RawHandleOrSocket {
422 match &self.inner {
423 Some(inner) => inner.as_raw_write_handle_or_socket(),
424 None => panic!("as_raw_write_handle_or_socket() called on closed LayeredDuplexer"),
425 }
426 }
427}
428
429#[cfg(windows)]
430impl<Inner: Duplex + AsReadWriteHandleOrSocket> AsReadWriteHandleOrSocket
431 for LayeredDuplexer<Inner>
432{
433 #[inline]
434 fn as_read_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
435 match &self.inner {
436 Some(inner) => inner.as_read_handle_or_socket(),
437 None => panic!("as_read_handle_or_socket() called on closed LayeredDuplexer"),
438 }
439 }
440
441 #[inline]
442 fn as_write_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
443 match &self.inner {
444 Some(inner) => inner.as_write_handle_or_socket(),
445 None => panic!("as_write_handle_or_socket() called on closed LayeredDuplexer"),
446 }
447 }
448}
449
450impl<Inner: fmt::Debug> fmt::Debug for LayeredDuplexer<Inner> {
451 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452 let mut b = f.debug_struct("LayeredDuplexer");
453 b.field("inner", &self.inner);
454 b.finish()
455 }
456}
457
458fn stream_already_ended() -> io::Error {
459 io::Error::new(io::ErrorKind::BrokenPipe, "stream has already ended")
460}
461
462impl<Inner> Drop for LayeredDuplexer<Inner> {
463 fn drop(&mut self) {
464 assert!(self.inner.is_none(), "stream was not closed or abandoned");
465 }
466}
467
468#[test]
469fn test_layered_duplexion() {
470 let mut input = io::Cursor::new(b"hello world".to_vec());
471 let mut reader = LayeredDuplexer::new(&mut input);
472 let mut s = String::new();
473 reader.read_to_string(&mut s).unwrap();
474 assert_eq!(s, "hello world");
475}