Skip to main content

moq_lite/coding/
reader.rs

1use std::{cmp, fmt::Debug, io};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use crate::{Error, coding::*};
6
7/// A reader for decoding messages from a stream.
8pub struct Reader<S: web_transport_trait::RecvStream, V> {
9	stream: S,
10	buffer: BytesMut,
11	version: V,
12}
13
14impl<S: web_transport_trait::RecvStream, V> Reader<S, V> {
15	pub fn new(stream: S, version: V) -> Self {
16		Self {
17			stream,
18			buffer: Default::default(),
19			version,
20		}
21	}
22
23	/// Decode the next message from the stream.
24	pub async fn decode<T: Decode<V> + Debug>(&mut self) -> Result<T, Error>
25	where
26		V: Clone,
27	{
28		loop {
29			let mut cursor = io::Cursor::new(&self.buffer);
30			match T::decode(&mut cursor, self.version.clone()) {
31				Ok(msg) => {
32					self.buffer.advance(cursor.position() as usize);
33					return Ok(msg);
34				}
35				Err(DecodeError::Short) => {
36					// Try to read more data
37					if !self.read_more().await? {
38						// Stream closed while we still need more data
39						return Err(DecodeError::Short.into());
40					}
41				}
42				Err(e) => return Err(e.into()),
43			}
44		}
45	}
46
47	/// Decode the next message unless the stream is closed.
48	pub async fn decode_maybe<T: Decode<V> + Debug>(&mut self) -> Result<Option<T>, Error>
49	where
50		V: Clone,
51	{
52		if !self.has_more().await? {
53			return Ok(None);
54		}
55
56		Ok(Some(self.decode().await?))
57	}
58
59	/// Decode the next message from the stream without consuming it.
60	pub async fn decode_peek<T: Decode<V> + Debug>(&mut self) -> Result<T, Error>
61	where
62		V: Clone,
63	{
64		loop {
65			let mut cursor = io::Cursor::new(&self.buffer);
66			match T::decode(&mut cursor, self.version.clone()) {
67				Ok(msg) => return Ok(msg),
68				Err(DecodeError::Short) => {
69					// Try to read more data
70					if !self.read_more().await? {
71						// Stream closed while we still need more data
72						return Err(DecodeError::Short.into());
73					}
74				}
75				Err(e) => return Err(e.into()),
76			}
77		}
78	}
79
80	/// Returns a non-zero chunk of data, or None if the stream is closed
81	pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
82		if !self.buffer.is_empty() {
83			let size = cmp::min(max, self.buffer.len());
84			let data = self.buffer.split_to(size).freeze();
85			return Ok(Some(data));
86		}
87
88		self.stream.read_chunk(max).await.map_err(Error::from_transport)
89	}
90
91	/// Read exactly the given number of bytes from the stream.
92	pub async fn read_exact(&mut self, size: usize) -> Result<Bytes, Error> {
93		// An optimization to avoid a copy if we have enough data in the buffer
94		if self.buffer.len() >= size {
95			return Ok(self.buffer.split_to(size).freeze());
96		}
97
98		let data = BytesMut::with_capacity(size.min(u16::MAX as usize));
99		let mut buf = data.limit(size);
100
101		let size = cmp::min(buf.remaining_mut(), self.buffer.len());
102		let data = self.buffer.split_to(size);
103		buf.put(data);
104
105		while buf.has_remaining_mut() {
106			match self.stream.read_buf(&mut buf).await {
107				Ok(Some(_)) => {}
108				Ok(None) => return Err(DecodeError::Short.into()),
109				Err(e) => return Err(Error::from_transport(e)),
110			}
111		}
112
113		Ok(buf.into_inner().freeze())
114	}
115
116	/// Skip the given number of bytes from the stream.
117	pub async fn skip(&mut self, mut size: usize) -> Result<(), Error> {
118		let buffered = self.buffer.len().min(size);
119		self.buffer.advance(buffered);
120		size -= buffered;
121
122		while size > 0 {
123			let chunk = self
124				.stream
125				.read_chunk(size)
126				.await
127				.map_err(Error::from_transport)?
128				.ok_or(DecodeError::Short)?;
129			size -= chunk.len();
130		}
131
132		Ok(())
133	}
134
135	/// Wait until the stream is closed, erroring if there are any additional bytes.
136	pub async fn closed(&mut self) -> Result<(), Error> {
137		if self.has_more().await? {
138			return Err(DecodeError::Short.into());
139		}
140
141		Ok(())
142	}
143
144	/// Returns true if there is more data available in the buffer or stream.
145	async fn has_more(&mut self) -> Result<bool, Error> {
146		if !self.buffer.is_empty() {
147			return Ok(true);
148		}
149
150		self.read_more().await
151	}
152
153	/// Try to read more data from the stream. Returns true if data was read, false if stream closed.
154	async fn read_more(&mut self) -> Result<bool, Error> {
155		match self.stream.read_buf(&mut self.buffer).await {
156			Ok(Some(_)) => Ok(true),
157			Ok(None) => Ok(false),
158			Err(e) => Err(Error::from_transport(e)),
159		}
160	}
161
162	/// Abort the stream with the given error.
163	pub fn abort(&mut self, err: &Error) {
164		self.stream.stop(err.to_code());
165	}
166
167	/// Cast the reader to a different version, used during version negotiation.
168	pub fn with_version<V2>(self, version: V2) -> Reader<S, V2> {
169		Reader {
170			stream: self.stream,
171			buffer: self.buffer,
172			version,
173		}
174	}
175}