moq_lite/coding/
reader.rs1use std::{cmp, fmt::Debug, io};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use crate::{Error, coding::*};
6
7pub struct Reader<S: web_transport_trait::RecvStream, V> {
9 stream: S,
10 buffer: BytesMut,
11 version: V,
12}
13
14impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
15 pub fn new(stream: S, version: V) -> Self {
16 Self {
17 stream,
18 buffer: Default::default(),
19 version,
20 }
21 }
22
23 pub async fn decode<T: Decode<V> + Debug>(&mut self) -> Result<T, Error>
25 where
26 V: Clone,
27 {
28 loop {
29 let mut cursor = io::Cursor::new(&self.buffer);
30 match T::decode(&mut cursor, self.version.clone()) {
31 Ok(msg) => {
32 self.buffer.advance(cursor.position() as usize);
33 return Ok(msg);
34 }
35 Err(DecodeError::Short) => {
36 if !self.read_more().await? {
38 return Err(DecodeError::Short.into());
40 }
41 }
42 Err(e) => return Err(e.into()),
43 }
44 }
45 }
46
47 pub async fn decode_maybe<T: Decode<V> + Debug>(&mut self) -> Result<Option<T>, Error>
49 where
50 V: Clone,
51 {
52 if !self.has_more().await? {
53 return Ok(None);
54 }
55
56 Ok(Some(self.decode().await?))
57 }
58
59 pub async fn decode_peek<T: Decode<V> + Debug>(&mut self) -> Result<T, Error>
61 where
62 V: Clone,
63 {
64 loop {
65 let mut cursor = io::Cursor::new(&self.buffer);
66 match T::decode(&mut cursor, self.version.clone()) {
67 Ok(msg) => return Ok(msg),
68 Err(DecodeError::Short) => {
69 if !self.read_more().await? {
71 return Err(DecodeError::Short.into());
73 }
74 }
75 Err(e) => return Err(e.into()),
76 }
77 }
78 }
79
80 pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
82 if !self.buffer.is_empty() {
83 let size = cmp::min(max, self.buffer.len());
84 let data = self.buffer.split_to(size).freeze();
85 return Ok(Some(data));
86 }
87
88 self.stream.read_chunk(max).await.map_err(Error::from_transport)
89 }
90
91 pub async fn read_exact(&mut self, size: usize) -> Result<Bytes, Error> {
93 if self.buffer.len() >= size {
95 return Ok(self.buffer.split_to(size).freeze());
96 }
97
98 let data = BytesMut::with_capacity(size.min(u16::MAX as usize));
99 let mut buf = data.limit(size);
100
101 let size = cmp::min(buf.remaining_mut(), self.buffer.len());
102 let data = self.buffer.split_to(size);
103 buf.put(data);
104
105 while buf.has_remaining_mut() {
106 match self.stream.read_buf(&mut buf).await {
107 Ok(Some(_)) => {}
108 Ok(None) => return Err(DecodeError::Short.into()),
109 Err(e) => return Err(Error::from_transport(e)),
110 }
111 }
112
113 Ok(buf.into_inner().freeze())
114 }
115
116 pub async fn skip(&mut self, mut size: usize) -> Result<(), Error> {
118 let buffered = self.buffer.len().min(size);
119 self.buffer.advance(buffered);
120 size -= buffered;
121
122 while size > 0 {
123 let chunk = self
124 .stream
125 .read_chunk(size)
126 .await
127 .map_err(Error::from_transport)?
128 .ok_or(DecodeError::Short)?;
129 size -= chunk.len();
130 }
131
132 Ok(())
133 }
134
135 pub async fn closed(&mut self) -> Result<(), Error> {
137 if self.has_more().await? {
138 return Err(DecodeError::Short.into());
139 }
140
141 Ok(())
142 }
143
144 async fn has_more(&mut self) -> Result<bool, Error> {
146 if !self.buffer.is_empty() {
147 return Ok(true);
148 }
149
150 self.read_more().await
151 }
152
153 async fn read_more(&mut self) -> Result<bool, Error> {
155 match self.stream.read_buf(&mut self.buffer).await {
156 Ok(Some(_)) => Ok(true),
157 Ok(None) => Ok(false),
158 Err(e) => Err(Error::from_transport(e)),
159 }
160 }
161
162 pub fn abort(&mut self, err: &Error) {
164 self.stream.stop(err.to_code());
165 }
166
167 pub fn with_version<V2>(self, version: V2) -> Reader<S, V2> {
169 Reader {
170 stream: self.stream,
171 buffer: self.buffer,
172 version,
173 }
174 }
175}