moq_lite/coding/
reader.rs1use std::{cmp, io, sync::Arc};
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::{coding::*, Error};
6
7pub struct Reader<S: web_transport_trait::RecvStream> {
8 stream: S,
9 buffer: BytesMut,
10}
11
12impl<S: web_transport_trait::RecvStream> Reader<S> {
13 pub fn new(stream: S) -> Self {
14 Self {
15 stream,
16 buffer: Default::default(),
17 }
18 }
19
20 pub async fn decode<T: Decode>(&mut self) -> Result<T, Error> {
21 loop {
22 let mut cursor = io::Cursor::new(&self.buffer);
23 match T::decode(&mut cursor) {
24 Ok(msg) => {
25 self.buffer.advance(cursor.position() as usize);
26 return Ok(msg);
27 }
28 Err(DecodeError::Short) => {
29 if self
31 .stream
32 .read_buf(&mut self.buffer)
33 .await
34 .map_err(|e| Error::Transport(Arc::new(e)))?
35 .is_none()
36 {
37 return Err(Error::Decode(DecodeError::Short));
39 }
40 }
41 Err(e) => return Err(Error::Decode(e)),
42 }
43 }
44 }
45
46 pub async fn decode_maybe<T: Decode>(&mut self) -> Result<Option<T>, Error> {
48 match self.closed().await {
49 Ok(()) => Ok(None),
50 Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)),
51 Err(e) => Err(e),
52 }
53 }
54
55 pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
57 if !self.buffer.is_empty() {
58 let size = cmp::min(max, self.buffer.len());
59 let data = self.buffer.split_to(size).freeze();
60 return Ok(Some(data));
61 }
62
63 self.stream
64 .read_chunk(max)
65 .await
66 .map_err(|e| Error::Transport(Arc::new(e)))
67 }
68
69 pub async fn read_exact(&mut self, size: usize) -> Result<Bytes, Error> {
70 if self.buffer.len() >= size {
72 return Ok(self.buffer.split_to(size).freeze());
73 }
74
75 let data = BytesMut::with_capacity(size.min(u16::MAX as usize));
76 let mut buf = data.limit(size);
77
78 let size = cmp::min(buf.remaining_mut(), self.buffer.len());
79 let data = self.buffer.split_to(size);
80 buf.put(data);
81
82 while buf.has_remaining_mut() {
83 self.stream
84 .read_buf(&mut buf)
85 .await
86 .map_err(|e| Error::Transport(Arc::new(e)))?;
87 }
88
89 Ok(buf.into_inner().freeze())
90 }
91
92 pub async fn skip(&mut self, mut size: usize) -> Result<(), Error> {
93 let buffered = self.buffer.len();
94 self.buffer.advance(size.min(buffered));
95 size -= buffered;
96
97 while size > 0 {
98 let chunk = self
99 .stream
100 .read_chunk(size)
101 .await
102 .map_err(|e| Error::Transport(Arc::new(e)))?
103 .ok_or(Error::Decode(DecodeError::Short))?;
104 size -= chunk.len();
105 }
106
107 Ok(())
108 }
109
110 pub async fn closed(&mut self) -> Result<(), Error> {
112 if self.buffer.is_empty()
113 && self
114 .stream
115 .read_buf(&mut self.buffer)
116 .await
117 .map_err(|e| Error::Transport(Arc::new(e)))?
118 .is_none()
119 {
120 return Ok(());
121 }
122
123 Err(DecodeError::ExpectedEnd.into())
124 }
125
126 pub fn abort(&mut self, err: &Error) {
127 self.stream.stop(err.to_code());
128 }
129}