1use core::fmt;
2use core::pin::Pin;
3use core::task::{ready, Context, Poll};
4
5use alloc::vec::Vec;
6
7use pin_project::pin_project;
8
9use crate::buf::Cursor;
10use crate::framed::Decoder;
11use crate::source::{AsyncSource, Source};
12use crate::util::slice_uninit_assume_init_mut;
13use crate::util::Error;
14use crate::{AsyncRead, AsyncReadExt, Read, ReadExt};
15
16#[pin_project]
31#[derive(Debug)]
32pub struct FramedRead<R, D> {
33 #[pin]
34 reader: R,
35 decoder: D,
36 buf: Vec<u8>,
37}
38
39impl<R, D> FramedRead<R, D> {
40 #[inline]
42 #[must_use]
43 pub const fn new(reader: R, decoder: D) -> Self {
44 Self { reader, decoder, buf: Vec::new() }
45 }
46
47 #[inline]
50 #[must_use]
51 pub fn with_capacity(
52 reader: R,
53 decoder: D,
54 capacity: usize,
55 ) -> Self {
56 Self { reader, decoder, buf: Vec::with_capacity(capacity) }
57 }
58
59 #[inline]
61 #[must_use]
62 pub fn reader(&self) -> &R {
63 &self.reader
64 }
65
66 #[inline]
68 #[must_use]
69 pub fn reader_mut(&mut self) -> &mut R {
70 &mut self.reader
71 }
72
73 #[inline]
75 #[must_use]
76 pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
77 self.project().reader
78 }
79
80 #[inline]
82 #[must_use]
83 pub fn decoder(&self) -> &D {
84 &self.decoder
85 }
86
87 #[inline]
89 #[must_use]
90 pub fn decoder_mut(&mut self) -> &mut D {
91 &mut self.decoder
92 }
93
94 #[inline]
96 #[must_use]
97 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
98 self.project().decoder
99 }
100
101 #[inline]
103 #[must_use]
104 pub fn read_buffer(&self) -> &Vec<u8> {
105 &self.buf
106 }
107
108 #[inline]
110 #[must_use]
111 pub fn read_buffer_mut(&mut self) -> &mut Vec<u8> {
112 &mut self.buf
113 }
114
115 #[inline]
117 #[must_use]
118 pub fn map_decoder<T, F>(self, f: F) -> FramedRead<R, T>
119 where
120 T: Decoder,
121 F: FnOnce(D) -> T,
122 {
123 FramedRead {
124 reader: self.reader,
125 decoder: f(self.decoder),
126 buf: self.buf,
127 }
128 }
129
130 #[inline]
132 #[must_use]
133 pub fn into_reader(self) -> R {
134 self.reader
135 }
136
137 #[inline]
139 #[must_use]
140 pub fn into_decoder(self) -> D {
141 self.decoder
142 }
143
144 #[inline]
146 #[must_use]
147 pub fn into_inner(self) -> (R, D) {
148 (self.reader, self.decoder)
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
154pub enum FramedReadError<T, Io> {
155 Decode(T),
157 Io(Io),
159}
160
161impl<T, Io> fmt::Display for FramedReadError<T, Io>
162where
163 T: fmt::Display,
164 Io: fmt::Display,
165{
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 match self {
168 Self::Decode(e) => e.fmt(f),
169 Self::Io(e) => e.fmt(f),
170 }
171 }
172}
173
174impl<T, Io> Error for FramedReadError<T, Io> where
175 Self: fmt::Debug + fmt::Display
176{
177}
178
179impl<R, D> AsyncSource for FramedRead<R, D>
180where
181 R: AsyncRead,
182 D: Decoder,
183{
184 type Item =
185 Result<D::Output, FramedReadError<D::Error, R::Error>>;
186
187 fn poll_next(
188 self: Pin<&mut Self>,
189 cx: &mut Context,
190 ) -> Poll<Self::Item> {
191 let mut this = self.project();
192
193 loop {
194 match this.decoder.decode(this.buf) {
195 Ok(Some(x)) => return Poll::Ready(Ok(x)),
196 Ok(None) => {
197 if this.buf.spare_capacity_mut().is_empty() {
198 this.buf.reserve(1);
199 }
200
201 let n = {
202 let mut buf =
203 Cursor::new(get_vec_spare_cap(this.buf));
204
205 ready!(this
206 .reader
207 .as_mut()
208 .poll_read_buf(cx, &mut buf))
209 .map_err(FramedReadError::Io)?;
210
211 buf.pos()
212 };
213
214 unsafe {
215 this.buf.set_len(this.buf.len() + n);
216 }
217 },
218 Err(e) => {
219 return Poll::Ready(Err(
220 FramedReadError::Decode(e),
221 ));
222 },
223 }
224 }
225 }
226}
227
228impl<R, D> Source for FramedRead<R, D>
229where
230 R: Read,
231 D: Decoder,
232{
233 type Item =
234 Result<D::Output, FramedReadError<D::Error, R::Error>>;
235
236 fn next(&mut self) -> Self::Item {
237 loop {
238 match self.decoder.decode(&mut self.buf) {
239 Ok(Some(x)) => return Ok(x),
240 Ok(None) => {
241 if self.buf.spare_capacity_mut().is_empty() {
242 self.buf.reserve(1);
243 }
244
245 let n = {
246 let mut buf = Cursor::new(get_vec_spare_cap(
247 &mut self.buf,
248 ));
249
250 self.reader
251 .read_buf(&mut buf)
252 .map_err(FramedReadError::Io)?;
253
254 buf.pos()
255 };
256
257 unsafe {
258 self.buf.set_len(self.buf.len() + n);
259 }
260 },
261 Err(e) => return Err(FramedReadError::Decode(e)),
262 }
263 }
264 }
265}
266
267fn get_vec_spare_cap(vec: &mut Vec<u8>) -> &mut [u8] {
268 unsafe { slice_uninit_assume_init_mut(vec.spare_capacity_mut()) }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 use core::convert::Infallible;
276
277 use crate::buf::{Buf, Cursor};
278 use crate::source::Source;
279
280 struct U32Decoder;
281
282 impl Decoder for U32Decoder {
283 type Output = i32;
284 type Error = Infallible;
285
286 fn decode(
287 &mut self,
288 buf: &mut Vec<u8>,
289 ) -> Result<Option<Self::Output>, Self::Error> {
290 let x = match buf.get(..4) {
291 Some(x) => x.try_into().expect(""),
292 None => return Ok(None),
293 };
294
295 let x = i32::from_be_bytes(x);
296
297 buf.drain(..4);
298 Ok(Some(x))
299 }
300 }
301
302 #[test]
303 fn test_framed_read_eof() {
304 let reader = Cursor::new([0u8, 0, 0, 42, 0, 0]).reader();
305 let mut framed = FramedRead::new(reader, U32Decoder);
306
307 assert_eq!(Source::next(&mut framed).expect(""), 42);
308 assert!(matches!(
309 Source::next(&mut framed),
310 Err(FramedReadError::Io(_))
311 ));
312 }
313
314 #[test]
315 fn test_framed_read_eof_exact() {
316 let reader =
317 Cursor::new([0u8, 0, 0, 42, 0, 0, 0, 62]).reader();
318 let mut framed = FramedRead::new(reader, U32Decoder);
319
320 assert_eq!(Source::next(&mut framed).expect(""), 42);
321 assert_eq!(Source::next(&mut framed).expect(""), 62);
322 assert!(matches!(
323 Source::next(&mut framed),
324 Err(FramedReadError::Io(_))
325 ));
326 }
327}