xitca_http/util/buffered/
buffer.rs

1use core::{
2    fmt,
3    mem::MaybeUninit,
4    ops::{Deref, DerefMut},
5};
6
7use std::io;
8
9use tracing::trace;
10use xitca_io::bytes::{Buf, BytesMut, PagedBytesMut};
11use xitca_unsafe_collection::bytes::{BufList, ChunkVectoredUninit, read_buf};
12
13pub use xitca_io::bytes::{BufInterest, BufRead, BufWrite};
14
15/// a writable buffer with const generic guarded max size limit.
16#[derive(Debug)]
17pub struct ReadBuf<const LIMIT: usize>(pub(super) PagedBytesMut<4096>);
18
19impl<const LIMIT: usize> ReadBuf<LIMIT> {
20    #[inline(always)]
21    pub fn new() -> Self {
22        Self(PagedBytesMut::new())
23    }
24
25    #[inline(always)]
26    pub fn into_inner(self) -> BytesMut {
27        self.0.into_inner()
28    }
29}
30
31impl<const LIMIT: usize> From<BytesMut> for ReadBuf<LIMIT> {
32    fn from(bytes: BytesMut) -> Self {
33        Self(PagedBytesMut::from(bytes))
34    }
35}
36
37impl<const LIMIT: usize> Default for ReadBuf<LIMIT> {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl<const LIMIT: usize> Deref for ReadBuf<LIMIT> {
44    type Target = BytesMut;
45
46    #[inline]
47    fn deref(&self) -> &Self::Target {
48        self.0.get_ref()
49    }
50}
51
52impl<const LIMIT: usize> DerefMut for ReadBuf<LIMIT> {
53    #[inline]
54    fn deref_mut(&mut self) -> &mut Self::Target {
55        self.0.get_mut()
56    }
57}
58
59impl<const LIMIT: usize> BufInterest for ReadBuf<LIMIT> {
60    #[inline]
61    fn want_write_buf(&self) -> bool {
62        self.0.remaining() < LIMIT
63    }
64
65    fn want_write_io(&self) -> bool {
66        unimplemented!()
67    }
68}
69
70impl<const LIMIT: usize> BufRead for ReadBuf<LIMIT> {
71    fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
72    where
73        Io: io::Read,
74    {
75        let mut read = 0;
76        loop {
77            match read_buf(io, &mut self.0) {
78                Ok(0) => {
79                    if read == 0 {
80                        return Err(io::ErrorKind::UnexpectedEof.into());
81                    }
82                    break;
83                }
84                Ok(n) => {
85                    read += n;
86                    if !self.want_write_buf() {
87                        trace!(
88                            "READ_BUF_LIMIT: {LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
89                        );
90                        break;
91                    }
92                }
93                Err(_) if read != 0 => break,
94                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
95                Err(e) => return Err(e),
96            }
97        }
98        Ok(())
99    }
100}
101
102#[derive(Default)]
103pub struct WriteBuf<const LIMIT: usize>(xitca_io::bytes::WriteBuf);
104
105impl<const LIMIT: usize> WriteBuf<LIMIT> {
106    #[inline]
107    pub fn new() -> Self {
108        Self(xitca_io::bytes::WriteBuf::new())
109    }
110
111    #[cfg(test)]
112    pub fn buf(&self) -> &[u8] {
113        self.0.buf()
114    }
115}
116
117impl<const LIMIT: usize> BufInterest for WriteBuf<LIMIT> {
118    #[inline]
119    fn want_write_buf(&self) -> bool {
120        self.0.len() < LIMIT
121    }
122
123    #[inline]
124    fn want_write_io(&self) -> bool {
125        self.0.want_write_io()
126    }
127}
128
129impl<const LIMIT: usize> BufWrite for WriteBuf<LIMIT> {
130    #[inline]
131    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
132    where
133        F: FnOnce(&mut BytesMut) -> Result<T, E>,
134    {
135        self.0.write_buf(func)
136    }
137
138    #[inline]
139    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
140        self.0.do_io(io)
141    }
142}
143
144// an internal buffer to collect writes before flushes
145pub struct ListWriteBuf<B, const LIMIT: usize> {
146    // Re-usable buffer that holds response head.
147    // After head writing finished it's split and pushed to list.
148    buf: BytesMut,
149    // Deque of user buffers if strategy is Queue
150    list: BufList<B, BUF_LIST_CNT>,
151    want_flush: bool,
152}
153
154impl<B: Buf, const LIMIT: usize> Default for ListWriteBuf<B, LIMIT> {
155    fn default() -> Self {
156        Self {
157            buf: BytesMut::new(),
158            list: BufList::new(),
159            want_flush: false,
160        }
161    }
162}
163
164impl<B: Buf, const LIMIT: usize> ListWriteBuf<B, LIMIT> {
165    /// split buf field from Self.
166    /// this is often coupled with [BufWrite::write_buf] method to obtain what has been written to
167    /// the buf.
168    pub fn split_buf(&mut self) -> BytesMut {
169        self.buf.split()
170    }
171
172    /// add new buf to list.
173    ///
174    /// # Panics
175    /// when push more items to list than the capacity. ListWriteBuf is strictly bounded.
176    pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
177        self.list.push(buf.into());
178        // cross reference with <Self as BufWrite>::buf_write method.
179        self.want_flush = false;
180    }
181}
182
183impl<B: Buf, const LIMIT: usize> fmt::Debug for ListWriteBuf<B, LIMIT> {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        f.debug_struct("ListBuf")
186            .field("remaining", &self.list.remaining())
187            .finish()
188    }
189}
190
191// buf list is forced to go in backpressure when it reaches this length.
192// 32 is chosen for max of 16 pipelined http requests with a single body item.
193const BUF_LIST_CNT: usize = 32;
194
195impl<B, const LIMIT: usize> BufInterest for ListWriteBuf<B, LIMIT>
196where
197    B: Buf + ChunkVectoredUninit,
198{
199    #[inline]
200    fn want_write_buf(&self) -> bool {
201        self.list.remaining() < LIMIT && !self.list.is_full()
202    }
203
204    #[inline]
205    fn want_write_io(&self) -> bool {
206        self.list.remaining() != 0 || self.want_flush
207    }
208}
209
210impl<B, const LIMIT: usize> BufWrite for ListWriteBuf<B, LIMIT>
211where
212    B: Buf + ChunkVectoredUninit,
213{
214    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
215    where
216        F: FnOnce(&mut BytesMut) -> Result<T, E>,
217    {
218        // in ListWriteBuf the BytesMut is only used as temporary storage of buffer.
219        // only when ListWriteBuf::buffer is called we set self.want_flush to false.
220        func(&mut self.buf).inspect_err(|_| self.buf.clear())
221    }
222
223    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
224        let queue = &mut self.list;
225        loop {
226            if self.want_flush {
227                match io::Write::flush(io) {
228                    Ok(_) => self.want_flush = false,
229                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
230                    Err(e) => return Err(e),
231                }
232                break;
233            }
234
235            let mut buf = [const { MaybeUninit::uninit() }; BUF_LIST_CNT];
236            let slice = queue.chunks_vectored_uninit_into_init(&mut buf);
237            match io.write_vectored(slice) {
238                Ok(0) => return write_zero(self.want_write_io()),
239                Ok(n) => {
240                    queue.advance(n);
241                    if queue.is_empty() {
242                        self.want_flush = true;
243                    }
244                }
245                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
246                Err(e) => return Err(e),
247            }
248        }
249
250        Ok(())
251    }
252}
253
254#[cold]
255#[inline(never)]
256fn write_zero(want_write: bool) -> io::Result<()> {
257    assert!(
258        want_write,
259        "BufWrite::write must be called after BufInterest::want_write return true."
260    );
261    Err(io::ErrorKind::WriteZero.into())
262}