xitca_http/util/buffered/
buffered_io.rs

1use core::{
2    future::{Future, poll_fn},
3    pin::Pin,
4};
5
6use std::io;
7
8use xitca_io::io::{AsyncIo, Interest};
9
10use super::buffer::{BufRead, BufWrite, ReadBuf};
11
12/// Io type with internal buffering.
13pub struct BufferedIo<'a, St, W, const READ_BUF_LIMIT: usize> {
14    /// mut reference of Io type that impl [AsyncIo] trait.
15    pub io: &'a mut St,
16    /// read buffer with const generic usize as capacity limit.
17    pub read_buf: ReadBuf<READ_BUF_LIMIT>,
18    /// generic type impl [BufWrite] trait as write buffer.
19    pub write_buf: W,
20}
21
22impl<'a, St, W, const READ_BUF_LIMIT: usize> BufferedIo<'a, St, W, READ_BUF_LIMIT>
23where
24    St: AsyncIo,
25    W: BufWrite,
26{
27    /// construct a new buffered io with given Io and buf writer.
28    pub fn new(io: &'a mut St, write_buf: W) -> Self {
29        Self {
30            io,
31            read_buf: ReadBuf::new(),
32            write_buf,
33        }
34    }
35
36    /// read until io blocked or read buffer is full and advance the length of it(read buffer).
37    #[inline]
38    pub fn try_read(&mut self) -> io::Result<()> {
39        BufRead::do_io(&mut self.read_buf, self.io)
40    }
41
42    /// write until write buffer is emptied or io blocked.
43    #[inline]
44    pub fn try_write(&mut self) -> io::Result<()> {
45        BufWrite::do_io(&mut self.write_buf, self.io)
46    }
47
48    /// check for io read readiness in async and do [Self::try_read].
49    pub async fn read(&mut self) -> io::Result<()> {
50        self.io.ready(Interest::READABLE).await?;
51        self.try_read()
52    }
53
54    /// drain write buffer and flush the io.
55    pub async fn drain_write(&mut self) -> io::Result<()> {
56        while self.write_buf.want_write_io() {
57            self.io.ready(Interest::WRITABLE).await?;
58            self.try_write()?;
59        }
60        Ok(())
61    }
62
63    /// shutdown Io gracefully.
64    pub fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> + '_ {
65        poll_fn(|cx| Pin::new(&mut *self.io).poll_shutdown(cx))
66    }
67}