xitca_http/util/buffered/
buffer.rs

1use core::{
2    fmt,
3    mem::MaybeUninit,
4    ops::{Deref, DerefMut},
5};
6
7use std::io;
8
9use tracing::trace;
10use xitca_io::bytes::{Buf, BytesMut};
11use xitca_unsafe_collection::bytes::{BufList, ChunkVectoredUninit, read_buf};
12
13pub use xitca_io::bytes::{BufInterest, BufRead, BufWrite};
14
15/// a writable buffer with const generic guarded max size limit.
16#[derive(Debug)]
17pub struct ReadBuf<const LIMIT: usize>(BytesMut);
18
19impl<const LIMIT: usize> ReadBuf<LIMIT> {
20    #[inline(always)]
21    pub fn new() -> Self {
22        Self(BytesMut::new())
23    }
24
25    #[inline(always)]
26    pub fn into_inner(self) -> BytesMut {
27        self.0
28    }
29}
30
31impl<const LIMIT: usize> From<BytesMut> for ReadBuf<LIMIT> {
32    fn from(bytes: BytesMut) -> Self {
33        Self(bytes)
34    }
35}
36
37impl<const LIMIT: usize> Default for ReadBuf<LIMIT> {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl<const LIMIT: usize> Deref for ReadBuf<LIMIT> {
44    type Target = BytesMut;
45
46    #[inline]
47    fn deref(&self) -> &Self::Target {
48        &self.0
49    }
50}
51
52impl<const LIMIT: usize> DerefMut for ReadBuf<LIMIT> {
53    #[inline]
54    fn deref_mut(&mut self) -> &mut Self::Target {
55        &mut self.0
56    }
57}
58
59impl<const LIMIT: usize> BufInterest for ReadBuf<LIMIT> {
60    #[inline]
61    fn want_write_buf(&self) -> bool {
62        self.0.remaining() < LIMIT
63    }
64
65    fn want_write_io(&self) -> bool {
66        unimplemented!()
67    }
68}
69
70impl<const LIMIT: usize> BufRead for ReadBuf<LIMIT> {
71    fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
72    where
73        Io: io::Read,
74    {
75        let len = self.0.len();
76        loop {
77            match read_buf(io, &mut self.0) {
78                Ok(0) => {
79                    if self.0.len() == len {
80                        return Err(io::ErrorKind::UnexpectedEof.into());
81                    }
82                    break;
83                }
84                Ok(_) => {
85                    if !self.want_write_buf() {
86                        trace!(
87                            "READ_BUF_LIMIT: {LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
88                        );
89                        break;
90                    }
91                }
92                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
93                Err(e) => {
94                    if self.0.len() == len {
95                        return Err(e);
96                    }
97                    break;
98                }
99            }
100        }
101        Ok(())
102    }
103}
104
105#[derive(Default)]
106pub struct WriteBuf<const LIMIT: usize>(xitca_io::bytes::WriteBuf);
107
108impl<const LIMIT: usize> WriteBuf<LIMIT> {
109    #[inline]
110    pub fn new() -> Self {
111        Self(xitca_io::bytes::WriteBuf::new())
112    }
113
114    #[cfg(test)]
115    pub fn buf(&self) -> &[u8] {
116        self.0.buf()
117    }
118}
119
120impl<const LIMIT: usize> BufInterest for WriteBuf<LIMIT> {
121    #[inline]
122    fn want_write_buf(&self) -> bool {
123        self.0.len() < LIMIT
124    }
125
126    #[inline]
127    fn want_write_io(&self) -> bool {
128        self.0.want_write_io()
129    }
130}
131
132impl<const LIMIT: usize> BufWrite for WriteBuf<LIMIT> {
133    #[inline]
134    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
135    where
136        F: FnOnce(&mut BytesMut) -> Result<T, E>,
137    {
138        self.0.write_buf(func)
139    }
140
141    #[inline]
142    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
143        self.0.do_io(io)
144    }
145}
146
147// an internal buffer to collect writes before flushes
148pub struct ListWriteBuf<B, const LIMIT: usize> {
149    // Re-usable buffer that holds response head.
150    // After head writing finished it's split and pushed to list.
151    buf: BytesMut,
152    // Deque of user buffers if strategy is Queue
153    list: BufList<B, BUF_LIST_CNT>,
154    want_flush: bool,
155}
156
157impl<B: Buf, const LIMIT: usize> Default for ListWriteBuf<B, LIMIT> {
158    fn default() -> Self {
159        Self {
160            buf: BytesMut::new(),
161            list: BufList::new(),
162            want_flush: false,
163        }
164    }
165}
166
167impl<B: Buf, const LIMIT: usize> ListWriteBuf<B, LIMIT> {
168    /// split buf field from Self.
169    /// this is often coupled with [BufWrite::write_buf] method to obtain what has been written to
170    /// the buf.
171    pub fn split_buf(&mut self) -> BytesMut {
172        self.buf.split()
173    }
174
175    /// add new buf to list.
176    ///
177    /// # Panics
178    /// when push more items to list than the capacity. ListWriteBuf is strictly bounded.
179    pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
180        self.list.push(buf.into());
181        // cross reference with <Self as BufWrite>::buf_write method.
182        self.want_flush = false;
183    }
184}
185
186impl<B: Buf, const LIMIT: usize> fmt::Debug for ListWriteBuf<B, LIMIT> {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("ListBuf")
189            .field("remaining", &self.list.remaining())
190            .finish()
191    }
192}
193
194// buf list is forced to go in backpressure when it reaches this length.
195// 32 is chosen for max of 16 pipelined http requests with a single body item.
196const BUF_LIST_CNT: usize = 32;
197
198impl<B, const LIMIT: usize> BufInterest for ListWriteBuf<B, LIMIT>
199where
200    B: Buf + ChunkVectoredUninit,
201{
202    #[inline]
203    fn want_write_buf(&self) -> bool {
204        self.list.remaining() < LIMIT && !self.list.is_full()
205    }
206
207    #[inline]
208    fn want_write_io(&self) -> bool {
209        self.list.remaining() != 0 || self.want_flush
210    }
211}
212
213impl<B, const LIMIT: usize> BufWrite for ListWriteBuf<B, LIMIT>
214where
215    B: Buf + ChunkVectoredUninit,
216{
217    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
218    where
219        F: FnOnce(&mut BytesMut) -> Result<T, E>,
220    {
221        // in ListWriteBuf the BytesMut is only used as temporary storage of buffer.
222        // only when ListWriteBuf::buffer is called we set self.want_flush to false.
223        func(&mut self.buf).inspect_err(|_| self.buf.clear())
224    }
225
226    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
227        let queue = &mut self.list;
228        loop {
229            if self.want_flush {
230                match io::Write::flush(io) {
231                    Ok(_) => self.want_flush = false,
232                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
233                    Err(e) => return Err(e),
234                }
235                break;
236            }
237
238            let mut buf = [const { MaybeUninit::uninit() }; BUF_LIST_CNT];
239            let slice = queue.chunks_vectored_uninit_into_init(&mut buf);
240            match io.write_vectored(slice) {
241                Ok(0) => return write_zero(self.want_write_io()),
242                Ok(n) => {
243                    queue.advance(n);
244                    if queue.is_empty() {
245                        self.want_flush = true;
246                    }
247                }
248                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
249                Err(e) => return Err(e),
250            }
251        }
252
253        Ok(())
254    }
255}
256
257#[cold]
258#[inline(never)]
259fn write_zero(want_write: bool) -> io::Result<()> {
260    assert!(
261        want_write,
262        "BufWrite::write must be called after BufInterest::want_write return true."
263    );
264    Err(io::ErrorKind::WriteZero.into())
265}