moq_lite/coding/
writer.rs

1use std::{fmt::Debug, sync::Arc};
2
3use crate::{coding::*, Error};
4
5// A wrapper around a SendStream that will reset on Drop
6pub struct Writer<S: web_transport_trait::SendStream, V> {
7	stream: Option<S>,
8	buffer: bytes::BytesMut,
9	version: V,
10}
11
12impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
13	pub fn new(stream: S, version: V) -> Self {
14		Self {
15			stream: Some(stream),
16			buffer: Default::default(),
17			version,
18		}
19	}
20
21	pub async fn encode<T: Encode<V> + Debug>(&mut self, msg: &T) -> Result<(), Error>
22	where
23		V: Clone,
24	{
25		self.buffer.clear();
26		msg.encode(&mut self.buffer, self.version.clone());
27
28		while !self.buffer.is_empty() {
29			self.stream
30				.as_mut()
31				.unwrap()
32				.write_buf(&mut self.buffer)
33				.await
34				.map_err(|e| Error::Transport(Arc::new(e)))?;
35		}
36
37		Ok(())
38	}
39
40	// Not public to avoid accidental partial writes.
41	async fn write<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<usize, Error> {
42		self.stream
43			.as_mut()
44			.unwrap()
45			.write_buf(buf)
46			.await
47			.map_err(|e| Error::Transport(Arc::new(e)))
48	}
49
50	// NOTE: We use Buf so we don't perform a copy when using Quinn.
51	pub async fn write_all<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<(), Error> {
52		while buf.has_remaining() {
53			self.write(buf).await?;
54		}
55		Ok(())
56	}
57
58	/// A clean termination of the stream, waiting for the peer to close.
59	pub fn finish(&mut self) -> Result<(), Error> {
60		self.stream
61			.as_mut()
62			.unwrap()
63			.finish()
64			.map_err(|e| Error::Transport(Arc::new(e)))
65	}
66
67	pub fn abort(&mut self, err: &Error) {
68		self.stream.as_mut().unwrap().reset(err.to_code());
69	}
70
71	pub async fn closed(&mut self) -> Result<(), Error> {
72		self.stream
73			.as_mut()
74			.unwrap()
75			.closed()
76			.await
77			.map_err(|e| Error::Transport(Arc::new(e)))?;
78		Ok(())
79	}
80
81	pub fn set_priority(&mut self, priority: u8) {
82		self.stream.as_mut().unwrap().set_priority(priority);
83	}
84
85	pub fn with_version<O>(mut self, version: O) -> Writer<S, O> {
86		Writer {
87			// We need to use an Option so Drop doesn't reset the stream.
88			stream: self.stream.take(),
89			buffer: std::mem::take(&mut self.buffer),
90			version,
91		}
92	}
93}
94
95impl<S: web_transport_trait::SendStream, V> Drop for Writer<S, V> {
96	fn drop(&mut self) {
97		if let Some(mut stream) = self.stream.take() {
98			// Unlike the Quinn default, we abort the stream on drop.
99			stream.reset(Error::Cancel.to_code());
100		}
101	}
102}