xitca_io/
bytes.rs

1//! re-export of [bytes] crate types.
2
3pub use bytes::*;
4pub use xitca_unsafe_collection::bytes::{BytesStr, EitherBuf, PagedBytesMut};
5
6use core::fmt;
7
8use std::io;
9
10/// A new type for help implementing [io::Write] and [fmt::Write] traits.
11pub struct BufMutWriter<'a, B>(pub &'a mut B);
12
13impl<B: BufMut> io::Write for BufMutWriter<'_, B> {
14    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
15        self.0.put_slice(buf);
16        Ok(buf.len())
17    }
18
19    fn flush(&mut self) -> io::Result<()> {
20        Ok(())
21    }
22}
23
24impl<B: BufMut> fmt::Write for BufMutWriter<'_, B> {
25    fn write_str(&mut self, s: &str) -> fmt::Result {
26        self.0.put_slice(s.as_bytes());
27        Ok(())
28    }
29}
30
31/// trait generic over different types of buffer strategy.
32pub trait BufInterest {
33    /// flag if buffer want more data to be filled in.
34    fn want_write_buf(&self) -> bool;
35
36    /// flag if buffer want to write data to io.
37    fn want_write_io(&self) -> bool;
38}
39
40/// trait generic over different types of read buffer strategy.
41pub trait BufRead: BufInterest {
42    /// read from IO and write into buffer.
43    fn do_io<Io: io::Read>(&mut self, io: &mut Io) -> io::Result<()>;
44}
45
46/// trait generic over different types of write buffer strategy.
47pub trait BufWrite: BufInterest {
48    /// write into [BytesMut] with closure that output a Result type.
49    /// the result type is used to hint buffer to stop wanting to flush IO on [BufWrite::do_io]
50    /// or revert BytesMut to previous state before method was called.
51    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
52    where
53        F: FnOnce(&mut BytesMut) -> Result<T, E>;
54
55    /// write into IO from buffer.
56    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()>;
57}
58
59impl BufInterest for BytesMut {
60    #[inline]
61    fn want_write_buf(&self) -> bool {
62        true
63    }
64
65    #[inline]
66    fn want_write_io(&self) -> bool {
67        !self.is_empty()
68    }
69}
70
71impl BufRead for BytesMut {
72    #[inline]
73    fn do_io<Io: io::Read>(&mut self, io: &mut Io) -> io::Result<()> {
74        buf_read(self, io)
75    }
76}
77
78impl BufWrite for BytesMut {
79    #[inline]
80    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
81    where
82        F: FnOnce(&mut Self) -> Result<T, E>,
83    {
84        let len = self.len();
85        func(self).inspect_err(|_| self.truncate(len))
86    }
87
88    #[cold]
89    #[inline(never)]
90    fn do_io<Io: io::Write>(&mut self, _: &mut Io) -> io::Result<()> {
91        unimplemented!()
92    }
93}
94
95pub struct WriteBuf {
96    buf: BytesMut,
97    want_flush: bool,
98}
99
100impl WriteBuf {
101    #[inline]
102    pub fn new() -> Self {
103        Self {
104            buf: BytesMut::new(),
105            want_flush: false,
106        }
107    }
108
109    #[inline]
110    pub fn len(&self) -> usize {
111        self.buf.len()
112    }
113
114    #[inline]
115    pub fn is_empty(&self) -> bool {
116        self.buf.is_empty()
117    }
118
119    #[inline]
120    pub fn into_inner(self) -> BytesMut {
121        self.buf
122    }
123
124    /// clear remaining bytes in buffer and set flush flag to false.
125    /// this would make following [BufInterest::want_write_io] call return false.
126    #[inline]
127    pub fn clear(&mut self) {
128        self.buf.clear();
129        self.want_flush = false;
130    }
131
132    #[inline]
133    pub fn buf(&self) -> &[u8] {
134        &self.buf
135    }
136}
137
138impl Default for WriteBuf {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl BufInterest for WriteBuf {
145    #[inline]
146    fn want_write_buf(&self) -> bool {
147        self.buf.want_write_buf()
148    }
149
150    #[inline]
151    fn want_write_io(&self) -> bool {
152        self.buf.want_write_io() || self.want_flush
153    }
154}
155
156impl BufWrite for WriteBuf {
157    #[inline]
158    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
159    where
160        F: FnOnce(&mut BytesMut) -> Result<T, E>,
161    {
162        self.buf.write_buf(func).inspect(|_| self.want_flush = false)
163    }
164
165    fn do_io<Io: io::Write>(&mut self, io: &mut Io) -> io::Result<()> {
166        loop {
167            if self.want_flush {
168                match io::Write::flush(io) {
169                    Ok(_) => self.want_flush = false,
170                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
171                    Err(e) => return Err(e),
172                }
173                break;
174            }
175            match io::Write::write(io, &self.buf) {
176                Ok(0) => return Err(io::ErrorKind::WriteZero.into()),
177                Ok(n) => {
178                    self.buf.advance(n);
179                    self.want_flush = self.buf.is_empty();
180                }
181                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
182                Err(e) => return Err(e),
183            }
184        }
185        Ok(())
186    }
187}
188
189impl<const P: usize> BufInterest for PagedBytesMut<P> {
190    #[inline]
191    fn want_write_buf(&self) -> bool {
192        self.get_ref().want_write_buf()
193    }
194
195    #[inline]
196    fn want_write_io(&self) -> bool {
197        self.get_ref().want_write_io()
198    }
199}
200
201impl<const P: usize> BufRead for PagedBytesMut<P> {
202    #[inline]
203    fn do_io<Io: io::Read>(&mut self, io: &mut Io) -> io::Result<()> {
204        buf_read(self, io)
205    }
206}
207
208fn buf_read<B, Io>(buf: &mut B, io: &mut Io) -> io::Result<()>
209where
210    Io: io::Read,
211    B: Buf + BufMut,
212{
213    let len = buf.remaining();
214    loop {
215        match xitca_unsafe_collection::bytes::read_buf(io, buf) {
216            Ok(0) => {
217                if buf.remaining() == len {
218                    return Err(io::ErrorKind::UnexpectedEof.into());
219                };
220                break;
221            }
222            Ok(_) => {}
223            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
224            Err(e) => {
225                if buf.remaining() == len {
226                    return Err(e);
227                }
228                break;
229            }
230        }
231    }
232    Ok(())
233}
234
235impl<L, R> BufInterest for EitherBuf<L, R>
236where
237    L: BufInterest,
238    R: BufInterest,
239{
240    #[inline]
241    fn want_write_buf(&self) -> bool {
242        match *self {
243            Self::Left(ref l) => l.want_write_buf(),
244            Self::Right(ref r) => r.want_write_buf(),
245        }
246    }
247
248    #[inline]
249    fn want_write_io(&self) -> bool {
250        match *self {
251            Self::Left(ref l) => l.want_write_io(),
252            Self::Right(ref r) => r.want_write_io(),
253        }
254    }
255}
256
257impl<L, R> BufWrite for EitherBuf<L, R>
258where
259    L: BufWrite,
260    R: BufWrite,
261{
262    #[inline]
263    fn write_buf<F, T, E>(&mut self, func: F) -> Result<T, E>
264    where
265        F: FnOnce(&mut BytesMut) -> Result<T, E>,
266    {
267        match *self {
268            Self::Left(ref mut l) => l.write_buf(func),
269            Self::Right(ref mut r) => r.write_buf(func),
270        }
271    }
272
273    #[inline]
274    fn do_io<Io>(&mut self, io: &mut Io) -> io::Result<()>
275    where
276        Io: io::Write,
277    {
278        match *self {
279            Self::Left(ref mut l) => l.do_io(io),
280            Self::Right(ref mut r) => r.do_io(io),
281        }
282    }
283}