xitca_http/util/buffered/
buffer.rs1use core::{
2 fmt,
3 mem::MaybeUninit,
4 ops::{Deref, DerefMut},
5};
6
7use std::io;
8
9use tracing::trace;
10use xitca_io::bytes::{Buf, BytesMut, PagedBytesMut};
11use xitca_unsafe_collection::bytes::{BufList, ChunkVectoredUninit, read_buf};
12
13pub use xitca_io::bytes::{BufInterest, BufRead, BufWrite};
14
15#[derive(Debug)]
17pub struct ReadBuf<const LIMIT: usize>(pub(super) PagedBytesMut<4096>);
18
19impl<const LIMIT: usize> ReadBuf<LIMIT> {
20 #[inline(always)]
21 pub fn new() -> Self {
22 Self(PagedBytesMut::new())
23 }
24
25 #[inline(always)]
26 pub fn into_inner(self) -> BytesMut {
27 self.0.into_inner()
28 }
29}
30
31impl<const LIMIT: usize> From<BytesMut> for ReadBuf<LIMIT> {
32 fn from(bytes: BytesMut) -> Self {
33 Self(PagedBytesMut::from(bytes))
34 }
35}
36
37impl<const LIMIT: usize> Default for ReadBuf<LIMIT> {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl<const LIMIT: usize> Deref for ReadBuf<LIMIT> {
44 type Target = BytesMut;
45
46 #[inline]
47 fn deref(&self) -> &Self::Target {
48 self.0.get_ref()
49 }
50}
51
52impl<const LIMIT: usize> DerefMut for ReadBuf<LIMIT> {
53 #[inline]
54 fn deref_mut(&mut self) -> &mut Self::Target {
55 self.0.get_mut()
56 }
57}
58
59impl<const LIMIT: usize> BufInterest for ReadBuf<LIMIT> {
60 #[inline]
61 fn want_write_buf(&self) -> bool {
62 self.0.remaining() < LIMIT
63 }
64
65 fn want_write_io(&self) -> bool {
66 unimplemented!()
67 }
68}
69
70impl<const LIMIT: usize> BufRead for ReadBuf<LIMIT> {
71 fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
72 where
73 Io: io::Read,
74 {
75 let mut read = 0;
76 loop {
77 match read_buf(io, &mut self.0) {
78 Ok(0) => {
79 if read == 0 {
80 return Err(io::ErrorKind::UnexpectedEof.into());
81 }
82 break;
83 }
84 Ok(n) => {
85 read += n;
86 if !self.want_write_buf() {
87 trace!(
88 "READ_BUF_LIMIT: {LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
89 );
90 break;
91 }
92 }
93 Err(_) if read != 0 => break,
94 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
95 Err(e) => return Err(e),
96 }
97 }
98 Ok(())
99 }
100}
101
102#[derive(Default)]
103pub struct WriteBuf<const LIMIT: usize>(xitca_io::bytes::WriteBuf);
104
105impl<const LIMIT: usize> WriteBuf<LIMIT> {
106 #[inline]
107 pub fn new() -> Self {
108 Self(xitca_io::bytes::WriteBuf::new())
109 }
110
111 #[cfg(test)]
112 pub fn buf(&self) -> &[u8] {
113 self.0.buf()
114 }
115}
116
117impl<const LIMIT: usize> BufInterest for WriteBuf<LIMIT> {
118 #[inline]
119 fn want_write_buf(&self) -> bool {
120 self.0.len() < LIMIT
121 }
122
123 #[inline]
124 fn want_write_io(&self) -> bool {
125 self.0.want_write_io()
126 }
127}
128
129impl<const LIMIT: usize> BufWrite for WriteBuf<LIMIT> {
130 #[inline]
131 fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
132 where
133 F: FnOnce(&mut BytesMut) -> Result<T, E>,
134 {
135 self.0.write_buf(func)
136 }
137
138 #[inline]
139 fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
140 self.0.do_io(io)
141 }
142}
143
144pub struct ListWriteBuf<B, const LIMIT: usize> {
146 buf: BytesMut,
149 list: BufList<B, BUF_LIST_CNT>,
151 want_flush: bool,
152}
153
154impl<B: Buf, const LIMIT: usize> Default for ListWriteBuf<B, LIMIT> {
155 fn default() -> Self {
156 Self {
157 buf: BytesMut::new(),
158 list: BufList::new(),
159 want_flush: false,
160 }
161 }
162}
163
164impl<B: Buf, const LIMIT: usize> ListWriteBuf<B, LIMIT> {
165 pub fn split_buf(&mut self) -> BytesMut {
169 self.buf.split()
170 }
171
172 pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
177 self.list.push(buf.into());
178 self.want_flush = false;
180 }
181}
182
183impl<B: Buf, const LIMIT: usize> fmt::Debug for ListWriteBuf<B, LIMIT> {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 f.debug_struct("ListBuf")
186 .field("remaining", &self.list.remaining())
187 .finish()
188 }
189}
190
191const BUF_LIST_CNT: usize = 32;
194
195impl<B, const LIMIT: usize> BufInterest for ListWriteBuf<B, LIMIT>
196where
197 B: Buf + ChunkVectoredUninit,
198{
199 #[inline]
200 fn want_write_buf(&self) -> bool {
201 self.list.remaining() < LIMIT && !self.list.is_full()
202 }
203
204 #[inline]
205 fn want_write_io(&self) -> bool {
206 self.list.remaining() != 0 || self.want_flush
207 }
208}
209
210impl<B, const LIMIT: usize> BufWrite for ListWriteBuf<B, LIMIT>
211where
212 B: Buf + ChunkVectoredUninit,
213{
214 fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
215 where
216 F: FnOnce(&mut BytesMut) -> Result<T, E>,
217 {
218 func(&mut self.buf).inspect_err(|_| self.buf.clear())
221 }
222
223 fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
224 let queue = &mut self.list;
225 loop {
226 if self.want_flush {
227 match io::Write::flush(io) {
228 Ok(_) => self.want_flush = false,
229 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
230 Err(e) => return Err(e),
231 }
232 break;
233 }
234
235 let mut buf = [const { MaybeUninit::uninit() }; BUF_LIST_CNT];
236 let slice = queue.chunks_vectored_uninit_into_init(&mut buf);
237 match io.write_vectored(slice) {
238 Ok(0) => return write_zero(self.want_write_io()),
239 Ok(n) => {
240 queue.advance(n);
241 if queue.is_empty() {
242 self.want_flush = true;
243 }
244 }
245 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
246 Err(e) => return Err(e),
247 }
248 }
249
250 Ok(())
251 }
252}
253
254#[cold]
255#[inline(never)]
256fn write_zero(want_write: bool) -> io::Result<()> {
257 assert!(
258 want_write,
259 "BufWrite::write must be called after BufInterest::want_write return true."
260 );
261 Err(io::ErrorKind::WriteZero.into())
262}