xitca_http/util/buffered/
buffered_io.rs

1use core::{
2    future::{Future, poll_fn},
3    pin::Pin,
4};
5
6use std::io;
7
8use tracing::trace;
9use xitca_io::io::{AsyncIo, Interest};
10
11use crate::bytes::BufInterest;
12
13use super::buffer::{BufWrite, ReadBuf};
14
15/// Io type with internal buffering.
16pub struct BufferedIo<'a, St, W, const READ_BUF_LIMIT: usize> {
17    /// mut reference of Io type that impl [AsyncIo] trait.
18    pub io: &'a mut St,
19    /// read buffer with const generic usize as capacity limit.
20    pub read_buf: ReadBuf<READ_BUF_LIMIT>,
21    /// generic type impl [BufWrite] trait as write buffer.
22    pub write_buf: W,
23}
24
25impl<'a, St, W, const READ_BUF_LIMIT: usize> BufferedIo<'a, St, W, READ_BUF_LIMIT>
26where
27    St: AsyncIo,
28    W: BufWrite,
29{
30    /// construct a new buffered io with given Io and buf writer.
31    pub fn new(io: &'a mut St, write_buf: W) -> Self {
32        Self {
33            io,
34            read_buf: ReadBuf::new(),
35            write_buf,
36        }
37    }
38
39    /// read until io blocked or read buffer is full and advance the length of it(read buffer).
40    #[inline]
41    pub fn try_read(&mut self) -> io::Result<Option<usize>> {
42        // TODO:
43        // BufRead::do_io should return io::Result<Option<usize>> and this function should forward reading logic to said method
44        let mut read = 0;
45        loop {
46            match xitca_unsafe_collection::bytes::read_buf(self.io, &mut self.read_buf.0) {
47                Ok(0) => break,
48                Ok(n) => {
49                    read += n;
50                    if !self.read_buf.want_write_buf() {
51                        trace!(
52                            "READ_BUF_LIMIT: {READ_BUF_LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
53                        );
54                        break;
55                    }
56                }
57                Err(_) if read != 0 => break,
58                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
59                Err(e) => return Err(e),
60            }
61        }
62        Ok(Some(read))
63    }
64
65    /// write until write buffer is emptied or io blocked.
66    #[inline]
67    pub fn try_write(&mut self) -> io::Result<()> {
68        BufWrite::do_io(&mut self.write_buf, self.io)
69    }
70
71    /// check for io read readiness in async and do [Self::try_read].
72    pub async fn read(&mut self) -> io::Result<usize> {
73        loop {
74            self.io.ready(Interest::READABLE).await?;
75            if let Some(read) = self.try_read()? {
76                return Ok(read);
77            }
78        }
79    }
80
81    /// drain write buffer and flush the io.
82    pub async fn drain_write(&mut self) -> io::Result<()> {
83        while self.write_buf.want_write_io() {
84            self.io.ready(Interest::WRITABLE).await?;
85            self.try_write()?;
86        }
87        Ok(())
88    }
89
90    /// shutdown Io gracefully.
91    pub fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
92        poll_fn(|cx| Pin::new(&mut *self.io).poll_shutdown(cx))
93    }
94}