moq_lite/coding/
writer.rs

1use std::{fmt::Debug, sync::Arc};
2
3use crate::{Error, coding::*};
4
5/// A wrapper around a [web_transport_trait::SendStream] that will reset on Drop.
6pub struct Writer<S: web_transport_trait::SendStream, V> {
7	stream: Option<S>,
8	buffer: bytes::BytesMut,
9	version: V,
10}
11
12impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
13	/// Create a new writer for the given stream and version.
14	pub fn new(stream: S, version: V) -> Self {
15		Self {
16			stream: Some(stream),
17			buffer: Default::default(),
18			version,
19		}
20	}
21
22	/// Encode the given message to the stream.
23	pub async fn encode<T: Encode<V> + Debug>(&mut self, msg: &T) -> Result<(), Error>
24	where
25		V: Clone,
26	{
27		self.buffer.clear();
28		msg.encode(&mut self.buffer, self.version.clone());
29
30		while !self.buffer.is_empty() {
31			self.stream
32				.as_mut()
33				.unwrap()
34				.write_buf(&mut self.buffer)
35				.await
36				.map_err(|e| Error::Transport(Arc::new(e)))?;
37		}
38
39		Ok(())
40	}
41
42	// Not public to avoid accidental partial writes.
43	async fn write<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<usize, Error> {
44		self.stream
45			.as_mut()
46			.unwrap()
47			.write_buf(buf)
48			.await
49			.map_err(|e| Error::Transport(Arc::new(e)))
50	}
51
52	/// Write the entire [Buf] to the stream.
53	///
54	/// NOTE: This can avoid performing a copy when using [Bytes].
55	pub async fn write_all<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<(), Error> {
56		while buf.has_remaining() {
57			self.write(buf).await?;
58		}
59		Ok(())
60	}
61
62	/// Mark the stream as finished.
63	pub fn finish(&mut self) -> Result<(), Error> {
64		self.stream
65			.as_mut()
66			.unwrap()
67			.finish()
68			.map_err(|e| Error::Transport(Arc::new(e)))
69	}
70
71	/// Abort the stream with the given error.
72	pub fn abort(&mut self, err: &Error) {
73		self.stream.as_mut().unwrap().reset(err.to_code());
74	}
75
76	/// Wait for the stream to be closed, or the [Self::finish] to be acknowledged by the peer.
77	pub async fn closed(&mut self) -> Result<(), Error> {
78		self.stream
79			.as_mut()
80			.unwrap()
81			.closed()
82			.await
83			.map_err(|e| Error::Transport(Arc::new(e)))?;
84		Ok(())
85	}
86
87	/// Set the priority of the stream.
88	pub fn set_priority(&mut self, priority: u8) {
89		self.stream.as_mut().unwrap().set_priority(priority);
90	}
91
92	/// Cast the writer to a different version, used during version negotiation.
93	pub fn with_version<O>(mut self, version: O) -> Writer<S, O> {
94		Writer {
95			// We need to use an Option so Drop doesn't reset the stream.
96			stream: self.stream.take(),
97			buffer: std::mem::take(&mut self.buffer),
98			version,
99		}
100	}
101}
102
103impl<S: web_transport_trait::SendStream, V> Drop for Writer<S, V> {
104	fn drop(&mut self) {
105		if let Some(mut stream) = self.stream.take() {
106			// Unlike the Quinn default, we abort the stream on drop.
107			stream.reset(Error::Cancel.to_code());
108		}
109	}
110}