moq_lite/coding/
writer.rs

1use std::sync::Arc;
2
3use crate::{coding::*, Error};
4
5// A wrapper around a SendStream that will reset on Drop
6pub struct Writer<S: web_transport_trait::SendStream> {
7	stream: S,
8	buffer: bytes::BytesMut,
9}
10
11impl<S: web_transport_trait::SendStream> Writer<S> {
12	pub fn new(stream: S) -> Self {
13		Self {
14			stream,
15			buffer: Default::default(),
16		}
17	}
18
19	pub async fn encode<T: Encode>(&mut self, msg: &T) -> Result<(), Error> {
20		self.buffer.clear();
21		msg.encode(&mut self.buffer);
22
23		while !self.buffer.is_empty() {
24			self.stream
25				.write_buf(&mut self.buffer)
26				.await
27				.map_err(|e| Error::Transport(Arc::new(e)))?;
28		}
29
30		Ok(())
31	}
32
33	// Not public to avoid accidental partial writes.
34	async fn write<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<usize, Error> {
35		self.stream
36			.write_buf(buf)
37			.await
38			.map_err(|e| Error::Transport(Arc::new(e)))
39	}
40
41	// NOTE: We use Buf so we don't perform a copy when using Quinn.
42	pub async fn write_all<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<(), Error> {
43		while buf.has_remaining() {
44			self.write(buf).await?;
45		}
46		Ok(())
47	}
48
49	/// A clean termination of the stream, waiting for the peer to close.
50	pub async fn finish(&mut self) -> Result<(), Error> {
51		self.stream.finish().await.map_err(|e| Error::Transport(Arc::new(e)))?;
52		Ok(())
53	}
54
55	pub fn abort(&mut self, err: &Error) {
56		self.stream.reset(err.to_code());
57	}
58
59	pub async fn closed(&mut self) -> Result<(), Error> {
60		self.stream.closed().await.map_err(|e| Error::Transport(Arc::new(e)))?;
61		Ok(())
62	}
63}
64
65impl<S: web_transport_trait::SendStream> Drop for Writer<S> {
66	fn drop(&mut self) {
67		// Unlike the Quinn default, we abort the stream on drop.
68		self.stream.reset(Error::Cancel.to_code());
69	}
70}