moq_lite/coding/
reader.rs1use std::{cmp, io, sync::Arc};
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::{coding::*, Error};
6
7pub struct Reader<S: web_transport_trait::RecvStream> {
8 stream: S,
9 buffer: BytesMut,
10}
11
12impl<S: web_transport_trait::RecvStream> Reader<S> {
13 pub fn new(stream: S) -> Self {
14 Self {
15 stream,
16 buffer: Default::default(),
17 }
18 }
19
20 pub async fn decode<T: Decode>(&mut self) -> Result<T, Error> {
21 loop {
22 let mut cursor = io::Cursor::new(&self.buffer);
23 match T::decode(&mut cursor) {
24 Ok(msg) => {
25 self.buffer.advance(cursor.position() as usize);
26 return Ok(msg);
27 }
28 Err(DecodeError::Short) => {
29 if self
31 .stream
32 .read_buf(&mut self.buffer)
33 .await
34 .map_err(|e| Error::Transport(Arc::new(e)))?
35 .is_none()
36 {
37 return Err(Error::Decode(DecodeError::Short));
39 }
40 }
41 Err(e) => return Err(Error::Decode(e)),
42 }
43 }
44 }
45
46 pub async fn decode_maybe<T: Decode>(&mut self) -> Result<Option<T>, Error> {
48 match self.closed().await {
49 Ok(()) => Ok(None),
50 Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)),
51 Err(e) => Err(e),
52 }
53 }
54
55 pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
57 if !self.buffer.is_empty() {
58 let size = cmp::min(max, self.buffer.len());
59 let data = self.buffer.split_to(size).freeze();
60 return Ok(Some(data));
61 }
62
63 self.stream
64 .read_chunk(max)
65 .await
66 .map_err(|e| Error::Transport(Arc::new(e)))
67 }
68
69 pub async fn closed(&mut self) -> Result<(), Error> {
71 if self.buffer.is_empty()
72 && self
73 .stream
74 .read_buf(&mut self.buffer)
75 .await
76 .map_err(|e| Error::Transport(Arc::new(e)))?
77 .is_none()
78 {
79 return Ok(());
80 }
81
82 Err(DecodeError::ExpectedEnd.into())
83 }
84
85 pub fn abort(&mut self, err: &Error) {
86 self.stream.stop(err.to_code());
87 }
88}