xitca_http/util/buffered/
buffered_io.rs1use core::{
2 future::{Future, poll_fn},
3 pin::Pin,
4};
5
6use std::io;
7
8use tracing::trace;
9use xitca_io::io::{AsyncIo, Interest};
10
11use crate::bytes::BufInterest;
12
13use super::buffer::{BufWrite, ReadBuf};
14
15pub struct BufferedIo<'a, St, W, const READ_BUF_LIMIT: usize> {
17 pub io: &'a mut St,
19 pub read_buf: ReadBuf<READ_BUF_LIMIT>,
21 pub write_buf: W,
23}
24
25impl<'a, St, W, const READ_BUF_LIMIT: usize> BufferedIo<'a, St, W, READ_BUF_LIMIT>
26where
27 St: AsyncIo,
28 W: BufWrite,
29{
30 pub fn new(io: &'a mut St, write_buf: W) -> Self {
32 Self {
33 io,
34 read_buf: ReadBuf::new(),
35 write_buf,
36 }
37 }
38
39 #[inline]
41 pub fn try_read(&mut self) -> io::Result<Option<usize>> {
42 let mut read = 0;
45 loop {
46 match xitca_unsafe_collection::bytes::read_buf(self.io, &mut self.read_buf.0) {
47 Ok(0) => break,
48 Ok(n) => {
49 read += n;
50 if !self.read_buf.want_write_buf() {
51 trace!(
52 "READ_BUF_LIMIT: {READ_BUF_LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
53 );
54 break;
55 }
56 }
57 Err(_) if read != 0 => break,
58 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
59 Err(e) => return Err(e),
60 }
61 }
62 Ok(Some(read))
63 }
64
65 #[inline]
67 pub fn try_write(&mut self) -> io::Result<()> {
68 BufWrite::do_io(&mut self.write_buf, self.io)
69 }
70
71 pub async fn read(&mut self) -> io::Result<usize> {
73 loop {
74 self.io.ready(Interest::READABLE).await?;
75 if let Some(read) = self.try_read()? {
76 return Ok(read);
77 }
78 }
79 }
80
81 pub async fn drain_write(&mut self) -> io::Result<()> {
83 while self.write_buf.want_write_io() {
84 self.io.ready(Interest::WRITABLE).await?;
85 self.try_write()?;
86 }
87 Ok(())
88 }
89
90 pub fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
92 poll_fn(|cx| Pin::new(&mut *self.io).poll_shutdown(cx))
93 }
94}