xitca_http/util/buffered/
buffer.rs1use core::{
2 fmt,
3 mem::MaybeUninit,
4 ops::{Deref, DerefMut},
5};
6
7use std::io;
8
9use tracing::trace;
10use xitca_io::bytes::{Buf, BytesMut};
11use xitca_unsafe_collection::bytes::{BufList, ChunkVectoredUninit, read_buf};
12
13pub use xitca_io::bytes::{BufInterest, BufRead, BufWrite};
14
15#[derive(Debug)]
17pub struct ReadBuf<const LIMIT: usize>(BytesMut);
18
19impl<const LIMIT: usize> ReadBuf<LIMIT> {
20 #[inline(always)]
21 pub fn new() -> Self {
22 Self(BytesMut::new())
23 }
24
25 #[inline(always)]
26 pub fn into_inner(self) -> BytesMut {
27 self.0
28 }
29}
30
31impl<const LIMIT: usize> From<BytesMut> for ReadBuf<LIMIT> {
32 fn from(bytes: BytesMut) -> Self {
33 Self(bytes)
34 }
35}
36
37impl<const LIMIT: usize> Default for ReadBuf<LIMIT> {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl<const LIMIT: usize> Deref for ReadBuf<LIMIT> {
44 type Target = BytesMut;
45
46 #[inline]
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl<const LIMIT: usize> DerefMut for ReadBuf<LIMIT> {
53 #[inline]
54 fn deref_mut(&mut self) -> &mut Self::Target {
55 &mut self.0
56 }
57}
58
59impl<const LIMIT: usize> BufInterest for ReadBuf<LIMIT> {
60 #[inline]
61 fn want_write_buf(&self) -> bool {
62 self.0.remaining() < LIMIT
63 }
64
65 fn want_write_io(&self) -> bool {
66 unimplemented!()
67 }
68}
69
70impl<const LIMIT: usize> BufRead for ReadBuf<LIMIT> {
71 fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
72 where
73 Io: io::Read,
74 {
75 let len = self.0.len();
76 loop {
77 match read_buf(io, &mut self.0) {
78 Ok(0) => {
79 if self.0.len() == len {
80 return Err(io::ErrorKind::UnexpectedEof.into());
81 }
82 break;
83 }
84 Ok(_) => {
85 if !self.want_write_buf() {
86 trace!(
87 "READ_BUF_LIMIT: {LIMIT} bytes reached. Entering backpressure(no log event for recovery)."
88 );
89 break;
90 }
91 }
92 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
93 Err(e) => {
94 if self.0.len() == len {
95 return Err(e);
96 }
97 break;
98 }
99 }
100 }
101 Ok(())
102 }
103}
104
105#[derive(Default)]
106pub struct WriteBuf<const LIMIT: usize>(xitca_io::bytes::WriteBuf);
107
108impl<const LIMIT: usize> WriteBuf<LIMIT> {
109 #[inline]
110 pub fn new() -> Self {
111 Self(xitca_io::bytes::WriteBuf::new())
112 }
113
114 #[cfg(test)]
115 pub fn buf(&self) -> &[u8] {
116 self.0.buf()
117 }
118}
119
120impl<const LIMIT: usize> BufInterest for WriteBuf<LIMIT> {
121 #[inline]
122 fn want_write_buf(&self) -> bool {
123 self.0.len() < LIMIT
124 }
125
126 #[inline]
127 fn want_write_io(&self) -> bool {
128 self.0.want_write_io()
129 }
130}
131
132impl<const LIMIT: usize> BufWrite for WriteBuf<LIMIT> {
133 #[inline]
134 fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
135 where
136 F: FnOnce(&mut BytesMut) -> Result<T, E>,
137 {
138 self.0.write_buf(func)
139 }
140
141 #[inline]
142 fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
143 self.0.do_io(io)
144 }
145}
146
147pub struct ListWriteBuf<B, const LIMIT: usize> {
149 buf: BytesMut,
152 list: BufList<B, BUF_LIST_CNT>,
154 want_flush: bool,
155}
156
157impl<B: Buf, const LIMIT: usize> Default for ListWriteBuf<B, LIMIT> {
158 fn default() -> Self {
159 Self {
160 buf: BytesMut::new(),
161 list: BufList::new(),
162 want_flush: false,
163 }
164 }
165}
166
167impl<B: Buf, const LIMIT: usize> ListWriteBuf<B, LIMIT> {
168 pub fn split_buf(&mut self) -> BytesMut {
172 self.buf.split()
173 }
174
175 pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
180 self.list.push(buf.into());
181 self.want_flush = false;
183 }
184}
185
186impl<B: Buf, const LIMIT: usize> fmt::Debug for ListWriteBuf<B, LIMIT> {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("ListBuf")
189 .field("remaining", &self.list.remaining())
190 .finish()
191 }
192}
193
194const BUF_LIST_CNT: usize = 32;
197
198impl<B, const LIMIT: usize> BufInterest for ListWriteBuf<B, LIMIT>
199where
200 B: Buf + ChunkVectoredUninit,
201{
202 #[inline]
203 fn want_write_buf(&self) -> bool {
204 self.list.remaining() < LIMIT && !self.list.is_full()
205 }
206
207 #[inline]
208 fn want_write_io(&self) -> bool {
209 self.list.remaining() != 0 || self.want_flush
210 }
211}
212
213impl<B, const LIMIT: usize> BufWrite for ListWriteBuf<B, LIMIT>
214where
215 B: Buf + ChunkVectoredUninit,
216{
217 fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
218 where
219 F: FnOnce(&mut BytesMut) -> Result<T, E>,
220 {
221 func(&mut self.buf).inspect_err(|_| self.buf.clear())
224 }
225
226 fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
227 let queue = &mut self.list;
228 loop {
229 if self.want_flush {
230 match io::Write::flush(io) {
231 Ok(_) => self.want_flush = false,
232 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
233 Err(e) => return Err(e),
234 }
235 break;
236 }
237
238 let mut buf = [const { MaybeUninit::uninit() }; BUF_LIST_CNT];
239 let slice = queue.chunks_vectored_uninit_into_init(&mut buf);
240 match io.write_vectored(slice) {
241 Ok(0) => return write_zero(self.want_write_io()),
242 Ok(n) => {
243 queue.advance(n);
244 if queue.is_empty() {
245 self.want_flush = true;
246 }
247 }
248 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
249 Err(e) => return Err(e),
250 }
251 }
252
253 Ok(())
254 }
255}
256
257#[cold]
258#[inline(never)]
259fn write_zero(want_write: bool) -> io::Result<()> {
260 assert!(
261 want_write,
262 "BufWrite::write must be called after BufInterest::want_write return true."
263 );
264 Err(io::ErrorKind::WriteZero.into())
265}