moq_lite/coding/
reader.rs

1use std::{cmp, io, sync::Arc};
2
3use bytes::{Buf, Bytes, BytesMut};
4
5use crate::{coding::*, Error};
6
7pub struct Reader<S: web_transport_trait::RecvStream> {
8	stream: S,
9	buffer: BytesMut,
10}
11
12impl<S: web_transport_trait::RecvStream> Reader<S> {
13	pub fn new(stream: S) -> Self {
14		Self {
15			stream,
16			buffer: Default::default(),
17		}
18	}
19
20	pub async fn decode<T: Decode>(&mut self) -> Result<T, Error> {
21		loop {
22			let mut cursor = io::Cursor::new(&self.buffer);
23			match T::decode(&mut cursor) {
24				Ok(msg) => {
25					self.buffer.advance(cursor.position() as usize);
26					return Ok(msg);
27				}
28				Err(DecodeError::Short) => {
29					// Try to read more data
30					if self
31						.stream
32						.read_buf(&mut self.buffer)
33						.await
34						.map_err(|e| Error::Transport(Arc::new(e)))?
35						.is_none()
36					{
37						// Stream closed while we still need more data
38						return Err(Error::Decode(DecodeError::Short));
39					}
40				}
41				Err(e) => return Err(Error::Decode(e)),
42			}
43		}
44	}
45
46	// Decode optional messages at the end of a stream
47	pub async fn decode_maybe<T: Decode>(&mut self) -> Result<Option<T>, Error> {
48		match self.closed().await {
49			Ok(()) => Ok(None),
50			Err(Error::Decode(DecodeError::ExpectedEnd)) => Ok(Some(self.decode().await?)),
51			Err(e) => Err(e),
52		}
53	}
54
55	// Returns a non-zero chunk of data, or None if the stream is closed
56	pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
57		if !self.buffer.is_empty() {
58			let size = cmp::min(max, self.buffer.len());
59			let data = self.buffer.split_to(size).freeze();
60			return Ok(Some(data));
61		}
62
63		self.stream
64			.read_chunk(max)
65			.await
66			.map_err(|e| Error::Transport(Arc::new(e)))
67	}
68
69	/// Wait until the stream is closed, erroring if there are any additional bytes.
70	pub async fn closed(&mut self) -> Result<(), Error> {
71		if self.buffer.is_empty()
72			&& self
73				.stream
74				.read_buf(&mut self.buffer)
75				.await
76				.map_err(|e| Error::Transport(Arc::new(e)))?
77				.is_none()
78		{
79			return Ok(());
80		}
81
82		Err(DecodeError::ExpectedEnd.into())
83	}
84
85	pub fn abort(&mut self, err: &Error) {
86		self.stream.stop(err.to_code());
87	}
88}