moq_lite/coding/
reader.rs

1use std::{cmp, io, sync::Arc};
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::{coding::*, Error};
6
7pub struct Reader<S: web_transport_trait::RecvStream> {
8	stream: S,
9	buffer: BytesMut,
10}
11
12impl<S: web_transport_trait::RecvStream> Reader<S> {
13	pub fn new(stream: S) -> Self {
14		Self {
15			stream,
16			buffer: Default::default(),
17		}
18	}
19
20	pub async fn decode<T: Decode>(&mut self) -> Result<T, Error> {
21		loop {
22			let mut cursor = io::Cursor::new(&self.buffer);
23			match T::decode(&mut cursor) {
24				Ok(msg) => {
25					self.buffer.advance(cursor.position() as usize);
26					return Ok(msg);
27				}
28				Err(DecodeError::Short) => {
29					// Try to read more data
30					if self
31						.stream
32						.read_buf(&mut self.buffer)
33						.await
34						.map_err(|e| Error::Transport(Arc::new(e)))?
35						.is_none()
36					{
37						// Stream closed while we still need more data
38						return Err(Error::Decode(DecodeError::Short));
39					}
40				}
41				Err(e) => return Err(Error::Decode(e)),
42			}
43		}
44	}
45
46	// Decode optional messages at the end of a stream
47	pub async fn decode_maybe<T: Decode>(&mut self) -> Result<Option<T>, Error> {
48		match self.closed().await {
49			Ok(()) => Ok(None),
50			Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)),
51			Err(e) => Err(e),
52		}
53	}
54
55	// Returns a non-zero chunk of data, or None if the stream is closed
56	pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
57		if !self.buffer.is_empty() {
58			let size = cmp::min(max, self.buffer.len());
59			let data = self.buffer.split_to(size).freeze();
60			return Ok(Some(data));
61		}
62
63		self.stream
64			.read_chunk(max)
65			.await
66			.map_err(|e| Error::Transport(Arc::new(e)))
67	}
68
69	pub async fn read_exact(&mut self, size: usize) -> Result<Bytes, Error> {
70		// An optimization to avoid a copy if we have enough data in the buffer
71		if self.buffer.len() >= size {
72			return Ok(self.buffer.split_to(size).freeze());
73		}
74
75		let data = BytesMut::with_capacity(size.min(u16::MAX as usize));
76		let mut buf = data.limit(size);
77
78		let size = cmp::min(buf.remaining_mut(), self.buffer.len());
79		let data = self.buffer.split_to(size);
80		buf.put(data);
81
82		while buf.has_remaining_mut() {
83			self.stream
84				.read_buf(&mut buf)
85				.await
86				.map_err(|e| Error::Transport(Arc::new(e)))?;
87		}
88
89		Ok(buf.into_inner().freeze())
90	}
91
92	pub async fn skip(&mut self, mut size: usize) -> Result<(), Error> {
93		let buffered = self.buffer.len();
94		self.buffer.advance(size.min(buffered));
95		size -= buffered;
96
97		while size > 0 {
98			let chunk = self
99				.stream
100				.read_chunk(size)
101				.await
102				.map_err(|e| Error::Transport(Arc::new(e)))?
103				.ok_or(Error::Decode(DecodeError::Short))?;
104			size -= chunk.len();
105		}
106
107		Ok(())
108	}
109
110	/// Wait until the stream is closed, erroring if there are any additional bytes.
111	pub async fn closed(&mut self) -> Result<(), Error> {
112		if self.buffer.is_empty()
113			&& self
114				.stream
115				.read_buf(&mut self.buffer)
116				.await
117				.map_err(|e| Error::Transport(Arc::new(e)))?
118				.is_none()
119		{
120			return Ok(());
121		}
122
123		Err(DecodeError::ExpectedEnd.into())
124	}
125
126	pub fn abort(&mut self, err: &Error) {
127		self.stream.stop(err.to_code());
128	}
129}