1use super::{IntoInnerError, DEFAULT_BUF_SIZE};
5use duplex::HalfDuplex;
6#[cfg(feature = "layered-io")]
7use layered_io::{default_suggested_buffer_size, Bufferable, HalfDuplexLayered};
8#[cfg(read_initializer)]
9use std::io::Initializer;
10use std::io::{self, BufRead, Error, ErrorKind, IoSlice, IoSliceMut, Read, Write};
11use std::{cmp, fmt};
12#[cfg(not(windows))]
13use {
14 io_extras::os::rustix::{AsRawFd, RawFd},
15 io_lifetimes::{AsFd, BorrowedFd},
16};
17#[cfg(windows)]
18use {
19 io_extras::os::windows::{
20 AsHandleOrSocket, AsRawHandleOrSocket, BorrowedHandleOrSocket, RawHandleOrSocket,
21 },
22 io_lifetimes::{AsHandle, AsSocket, BorrowedHandle, BorrowedSocket},
23 std::os::windows::io::{AsRawHandle, AsRawSocket, RawHandle, RawSocket},
24};
25
26pub struct BufDuplexer<Inner: HalfDuplex> {
120 inner: BufDuplexerBackend<Inner>,
121}
122
123pub(crate) struct BufDuplexerBackend<Inner: HalfDuplex> {
124 inner: Option<Inner>,
125
126 writer_buf: Vec<u8>,
128 panicked: bool,
132
133 reader_buf: Box<[u8]>,
135 pos: usize,
136 cap: usize,
137}
138
139impl<Inner: HalfDuplex> BufDuplexer<Inner> {
140 #[inline]
152 pub fn new(inner: Inner) -> Self {
153 Self {
154 inner: BufDuplexerBackend::new(inner),
155 }
156 }
157
158 #[inline]
174 pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
175 Self {
176 inner: BufDuplexerBackend::with_capacities(reader_capacity, writer_capacity, inner),
177 }
178 }
179
180 #[inline]
194 pub fn get_ref(&self) -> &Inner {
195 self.inner.get_ref()
196 }
197
198 #[inline]
214 pub fn get_mut(&mut self) -> &mut Inner {
215 self.inner.get_mut()
216 }
217
218 #[inline]
232 pub fn writer_buffer(&self) -> &[u8] {
233 self.inner.writer_buffer()
234 }
235
236 pub fn reader_buffer(&self) -> &[u8] {
263 self.inner.reader_buffer()
264 }
265
266 #[inline]
283 pub fn writer_capacity(&self) -> usize {
284 self.inner.writer_capacity()
285 }
286
287 pub fn reader_capacity(&self) -> usize {
309 self.inner.reader_capacity()
310 }
311
312 pub fn into_inner(self) -> Result<Inner, IntoInnerError<Self>> {
334 self.inner
335 .into_inner()
336 .map_err(|err| err.new_wrapped(|inner| Self { inner }))
337 }
338}
339
340impl<Inner: HalfDuplex> BufDuplexerBackend<Inner> {
341 pub fn new(inner: Inner) -> Self {
342 Self::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
343 }
344
345 pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
346 #[cfg(not(read_initializer))]
347 let buffer = vec![0; reader_capacity];
348
349 #[cfg(read_initializer)]
350 let buffer = unsafe {
351 let mut buffer = Vec::with_capacity(reader_capacity);
352 buffer.set_len(reader_capacity);
353 inner.initializer().initialize(&mut buffer);
354 buffer
355 };
356
357 Self {
358 inner: Some(inner),
359 writer_buf: Vec::with_capacity(writer_capacity),
360 panicked: false,
361 reader_buf: buffer.into_boxed_slice(),
362 pos: 0,
363 cap: 0,
364 }
365 }
366
367 pub(super) fn flush_buf(&mut self) -> io::Result<()> {
375 struct BufGuard<'a> {
379 buffer: &'a mut Vec<u8>,
380 written: usize,
381 }
382
383 impl<'a> BufGuard<'a> {
384 fn new(buffer: &'a mut Vec<u8>) -> Self {
385 Self { buffer, written: 0 }
386 }
387
388 fn remaining(&self) -> &[u8] {
390 &self.buffer[self.written..]
391 }
392
393 fn consume(&mut self, amt: usize) {
395 self.written += amt;
396 }
397
398 fn done(&self) -> bool {
400 self.written >= self.buffer.len()
401 }
402 }
403
404 impl Drop for BufGuard<'_> {
405 fn drop(&mut self) {
406 if self.written > 0 {
407 self.buffer.drain(..self.written);
408 }
409 }
410 }
411
412 let mut guard = BufGuard::new(&mut self.writer_buf);
413 let inner = self.inner.as_mut().unwrap();
414 while !guard.done() {
415 self.panicked = true;
416 let r = inner.write(guard.remaining());
417 self.panicked = false;
418
419 match r {
420 Ok(0) => {
421 return Err(Error::new(
422 ErrorKind::WriteZero,
423 "failed to write the buffered data",
424 ));
425 }
426 Ok(n) => guard.consume(n),
427 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
428 Err(e) => return Err(e),
429 }
430 }
431 Ok(())
432 }
433
434 pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
438 let available = self.writer_buf.capacity() - self.writer_buf.len();
439 let amt_to_buffer = available.min(buf.len());
440 self.writer_buf.extend_from_slice(&buf[..amt_to_buffer]);
441 amt_to_buffer
442 }
443
444 #[inline]
445 pub fn get_ref(&self) -> &Inner {
446 self.inner.as_ref().unwrap()
447 }
448
449 #[inline]
450 pub fn get_mut(&mut self) -> &mut Inner {
451 self.inner.as_mut().unwrap()
452 }
453
454 #[inline]
455 pub fn writer_buffer(&self) -> &[u8] {
456 &self.writer_buf
457 }
458
459 pub fn reader_buffer(&self) -> &[u8] {
460 &self.reader_buf[self.pos..self.cap]
461 }
462
463 #[inline]
464 pub fn writer_capacity(&self) -> usize {
465 self.writer_buf.capacity()
466 }
467
468 pub fn reader_capacity(&self) -> usize {
469 self.reader_buf.len()
470 }
471
472 pub fn into_inner(mut self) -> Result<Inner, IntoInnerError<Self>> {
473 match self.flush_buf() {
474 Err(e) => Err(IntoInnerError::new(self, e)),
475 Ok(()) => Ok(self.inner.take().unwrap()),
476 }
477 }
478
479 #[inline]
481 fn discard_reader_buffer(&mut self) {
482 self.pos = 0;
483 self.cap = 0;
484 }
485}
486
487impl<Inner: HalfDuplex> Write for BufDuplexer<Inner> {
488 #[inline]
489 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
490 self.inner.write(buf)
491 }
492
493 #[inline]
494 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
495 self.inner.write_all(buf)
496 }
497
498 #[inline]
499 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
500 self.inner.write_vectored(bufs)
501 }
502
503 #[cfg(can_vector)]
504 #[inline]
505 fn is_write_vectored(&self) -> bool {
506 self.inner.is_write_vectored()
507 }
508
509 #[inline]
510 fn flush(&mut self) -> io::Result<()> {
511 self.inner.flush()
512 }
513}
514
515impl<Inner: HalfDuplex> Write for BufDuplexerBackend<Inner> {
516 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
517 if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
518 self.flush_buf()?;
519 }
520 if buf.len() >= self.writer_buf.capacity() {
522 self.panicked = true;
523 let r = self.get_mut().write(buf);
524 self.panicked = false;
525 r
526 } else {
527 self.writer_buf.extend_from_slice(buf);
528 Ok(buf.len())
529 }
530 }
531
532 fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
533 if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
538 self.flush_buf()?;
539 }
540 if buf.len() >= self.writer_buf.capacity() {
542 self.panicked = true;
543 let r = self.get_mut().write_all(buf);
544 self.panicked = false;
545 r
546 } else {
547 self.writer_buf.extend_from_slice(buf);
548 Ok(())
549 }
550 }
551
552 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
553 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
554 if self.writer_buf.len() + total_len > self.writer_buf.capacity() {
555 self.flush_buf()?;
556 }
557 if total_len >= self.writer_buf.capacity() {
559 self.panicked = true;
560 let r = self.get_mut().write_vectored(bufs);
561 self.panicked = false;
562 r
563 } else {
564 bufs.iter()
565 .for_each(|b| self.writer_buf.extend_from_slice(b));
566 Ok(total_len)
567 }
568 }
569
570 #[cfg(can_vector)]
571 #[inline]
572 fn is_write_vectored(&self) -> bool {
573 self.get_ref().is_write_vectored()
574 }
575
576 #[inline]
577 fn flush(&mut self) -> io::Result<()> {
578 self.flush_buf().and_then(|()| self.get_mut().flush())
579 }
580}
581
582impl<Inner: HalfDuplex> Read for BufDuplexer<Inner> {
583 #[inline]
584 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
585 self.inner.flush()?;
587
588 self.inner.read(buf)
589 }
590
591 #[inline]
592 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
593 self.inner.flush()?;
595
596 self.inner.read_vectored(bufs)
597 }
598
599 #[cfg(can_vector)]
600 #[inline]
601 fn is_read_vectored(&self) -> bool {
602 self.inner.is_read_vectored()
603 }
604
605 #[cfg(read_initializer)]
607 #[inline]
608 unsafe fn initializer(&self) -> Initializer {
609 self.inner.initializer()
610 }
611}
612
613impl<Inner: HalfDuplex> Read for BufDuplexerBackend<Inner> {
614 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
615 if self.pos == self.cap && buf.len() >= self.reader_buf.len() {
616 self.discard_reader_buffer();
617 return self.inner.as_mut().unwrap().read(buf);
618 }
619 let size = {
620 let mut rem = self.fill_buf()?;
621 rem.read(buf)?
622 };
623 self.consume(size);
624 Ok(size)
625 }
626
627 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
628 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
629 if self.pos == self.cap && total_len >= self.reader_buf.len() {
630 self.discard_reader_buffer();
631 return self.inner.as_mut().unwrap().read_vectored(bufs);
632 }
633 let size = {
634 let mut rem = self.fill_buf()?;
635 rem.read_vectored(bufs)?
636 };
637 self.consume(size);
638 Ok(size)
639 }
640
641 #[cfg(can_vector)]
642 fn is_read_vectored(&self) -> bool {
643 self.inner.as_ref().unwrap().is_read_vectored()
644 }
645
646 #[cfg(read_initializer)]
648 unsafe fn initializer(&self) -> Initializer {
649 self.inner.as_ref().unwrap().initializer()
650 }
651}
652
653impl<Inner: HalfDuplex> BufRead for BufDuplexer<Inner> {
654 #[inline]
655 fn fill_buf(&mut self) -> io::Result<&[u8]> {
656 self.inner.fill_buf()
657 }
658
659 #[inline]
660 fn consume(&mut self, amt: usize) {
661 self.inner.consume(amt)
662 }
663
664 #[inline]
665 fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
666 self.inner.flush()?;
668
669 self.inner.read_until(byte, buf)
670 }
671
672 #[inline]
673 fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
674 self.inner.flush()?;
676
677 self.inner.read_line(buf)
678 }
679}
680
681impl<Inner: HalfDuplex> BufRead for BufDuplexerBackend<Inner> {
683 fn fill_buf(&mut self) -> io::Result<&[u8]> {
684 if self.pos >= self.cap {
689 debug_assert!(self.pos == self.cap);
690 self.cap = self.inner.as_mut().unwrap().read(&mut self.reader_buf)?;
691 self.pos = 0;
692 }
693 Ok(&self.reader_buf[self.pos..self.cap])
694 }
695
696 fn consume(&mut self, amt: usize) {
697 self.pos = cmp::min(self.pos + amt, self.cap);
698 }
699}
700
701impl<Inner: HalfDuplex> fmt::Debug for BufDuplexer<Inner>
702where
703 Inner: fmt::Debug,
704{
705 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
706 self.inner.fmt(fmt)
707 }
708}
709
710impl<Inner: HalfDuplex> fmt::Debug for BufDuplexerBackend<Inner>
711where
712 Inner: fmt::Debug,
713{
714 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
715 fmt.debug_struct("BufDuplexer")
716 .field("inner", &self.inner.as_ref().unwrap())
717 .field(
718 "reader_buffer",
719 &format_args!("{}/{}", self.cap - self.pos, self.reader_buf.len()),
720 )
721 .field(
722 "writer_buffer",
723 &format_args!("{}/{}", self.writer_buf.len(), self.writer_buf.capacity()),
724 )
725 .finish()
726 }
727}
728
729impl<Inner: HalfDuplex> Drop for BufDuplexerBackend<Inner> {
730 fn drop(&mut self) {
731 if self.inner.is_some() && !self.panicked {
732 let _r = self.flush_buf();
734 }
735 }
736}
737
738#[cfg(not(windows))]
739impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexer<Inner> {
740 #[inline]
741 fn as_raw_fd(&self) -> RawFd {
742 self.inner.as_raw_fd()
743 }
744}
745
746#[cfg(windows)]
747impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexer<Inner> {
748 #[inline]
749 fn as_raw_handle(&self) -> RawHandle {
750 self.inner.as_raw_handle()
751 }
752}
753
754#[cfg(windows)]
755impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexer<Inner> {
756 #[inline]
757 fn as_raw_socket(&self) -> RawSocket {
758 self.inner.as_raw_socket()
759 }
760}
761
762#[cfg(windows)]
763impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexer<Inner> {
764 #[inline]
765 fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
766 self.inner.as_raw_handle_or_socket()
767 }
768}
769
770#[cfg(not(windows))]
771impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexerBackend<Inner> {
772 #[inline]
773 fn as_raw_fd(&self) -> RawFd {
774 self.inner.as_ref().unwrap().as_raw_fd()
775 }
776}
777
778#[cfg(windows)]
779impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexerBackend<Inner> {
780 #[inline]
781 fn as_raw_handle(&self) -> RawHandle {
782 self.inner.as_ref().unwrap().as_raw_handle()
783 }
784}
785
786#[cfg(windows)]
787impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexerBackend<Inner> {
788 #[inline]
789 fn as_raw_socket(&self) -> RawSocket {
790 self.inner.as_ref().unwrap().as_raw_socket()
791 }
792}
793
794#[cfg(windows)]
795impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexerBackend<Inner> {
796 #[inline]
797 fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
798 self.inner.as_ref().unwrap().as_raw_handle_or_socket()
799 }
800}
801
802#[cfg(not(windows))]
803impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexer<Inner> {
804 #[inline]
805 fn as_fd(&self) -> BorrowedFd<'_> {
806 self.inner.as_fd()
807 }
808}
809
810#[cfg(windows)]
811impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexer<Inner> {
812 #[inline]
813 fn as_handle(&self) -> BorrowedHandle<'_> {
814 self.inner.as_handle()
815 }
816}
817
818#[cfg(windows)]
819impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexer<Inner> {
820 #[inline]
821 fn as_socket(&self) -> BorrowedSocket<'_> {
822 self.inner.as_socket()
823 }
824}
825
826#[cfg(windows)]
827impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexer<Inner> {
828 #[inline]
829 fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
830 self.inner.as_handle_or_socket()
831 }
832}
833
834#[cfg(not(windows))]
835impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexerBackend<Inner> {
836 #[inline]
837 fn as_fd(&self) -> BorrowedFd<'_> {
838 self.inner.as_ref().unwrap().as_fd()
839 }
840}
841
842#[cfg(windows)]
843impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexerBackend<Inner> {
844 #[inline]
845 fn as_handle(&self) -> BorrowedHandle<'_> {
846 self.inner.as_ref().unwrap().as_handle()
847 }
848}
849
850#[cfg(windows)]
851impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexerBackend<Inner> {
852 #[inline]
853 fn as_socket(&self) -> BorrowedSocket<'_> {
854 self.inner.as_ref().unwrap().as_socket()
855 }
856}
857
858#[cfg(windows)]
859impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexerBackend<Inner> {
860 #[inline]
861 fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
862 self.inner.as_ref().unwrap().as_handle_or_socket()
863 }
864}
865
866#[cfg(feature = "terminal-io")]
867impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal for BufDuplexer<Inner> {}
868
869#[cfg(feature = "terminal-io")]
870impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal
871 for BufDuplexerBackend<Inner>
872{
873}
874
875#[cfg(feature = "terminal-io")]
876impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
877 for BufDuplexer<Inner>
878{
879 #[inline]
880 fn color_support(&self) -> terminal_io::TerminalColorSupport {
881 self.inner.color_support()
882 }
883
884 #[inline]
885 fn color_preference(&self) -> bool {
886 self.inner.color_preference()
887 }
888
889 #[inline]
890 fn is_output_terminal(&self) -> bool {
891 self.inner.is_output_terminal()
892 }
893}
894
895#[cfg(feature = "terminal-io")]
896impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
897 for BufDuplexerBackend<Inner>
898{
899 #[inline]
900 fn color_support(&self) -> terminal_io::TerminalColorSupport {
901 self.inner.as_ref().unwrap().color_support()
902 }
903
904 #[inline]
905 fn color_preference(&self) -> bool {
906 self.inner.as_ref().unwrap().color_preference()
907 }
908
909 #[inline]
910 fn is_output_terminal(&self) -> bool {
911 match &self.inner {
912 Some(inner) => inner.is_output_terminal(),
913 None => false,
914 }
915 }
916}
917
918#[cfg(feature = "layered-io")]
919impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexer<Inner> {
920 #[inline]
921 fn abandon(&mut self) {
922 self.inner.abandon()
923 }
924
925 #[inline]
926 fn suggested_buffer_size(&self) -> usize {
927 self.inner.suggested_buffer_size()
928 }
929}
930
931#[cfg(feature = "layered-io")]
932impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexerBackend<Inner> {
933 #[inline]
934 fn abandon(&mut self) {
935 match &mut self.inner {
936 Some(inner) => inner.abandon(),
937 None => (),
938 }
939 }
940
941 #[inline]
942 fn suggested_buffer_size(&self) -> usize {
943 match &self.inner {
944 Some(inner) => {
945 std::cmp::max(inner.minimum_buffer_size(), inner.suggested_buffer_size())
946 }
947 None => default_suggested_buffer_size(self),
948 }
949 }
950}