moq_lite/coding/
writer.rs1use std::{fmt::Debug, sync::Arc};
2
3use crate::{coding::*, Error};
4
5pub struct Writer<S: web_transport_trait::SendStream, V> {
7 stream: Option<S>,
8 buffer: bytes::BytesMut,
9 version: V,
10}
11
12impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
13 pub fn new(stream: S, version: V) -> Self {
14 Self {
15 stream: Some(stream),
16 buffer: Default::default(),
17 version,
18 }
19 }
20
21 pub async fn encode<T: Encode<V> + Debug>(&mut self, msg: &T) -> Result<(), Error>
22 where
23 V: Clone,
24 {
25 self.buffer.clear();
26 msg.encode(&mut self.buffer, self.version.clone());
27
28 while !self.buffer.is_empty() {
29 self.stream
30 .as_mut()
31 .unwrap()
32 .write_buf(&mut self.buffer)
33 .await
34 .map_err(|e| Error::Transport(Arc::new(e)))?;
35 }
36
37 Ok(())
38 }
39
40 async fn write<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<usize, Error> {
42 self.stream
43 .as_mut()
44 .unwrap()
45 .write_buf(buf)
46 .await
47 .map_err(|e| Error::Transport(Arc::new(e)))
48 }
49
50 pub async fn write_all<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<(), Error> {
52 while buf.has_remaining() {
53 self.write(buf).await?;
54 }
55 Ok(())
56 }
57
58 pub fn finish(&mut self) -> Result<(), Error> {
60 self.stream
61 .as_mut()
62 .unwrap()
63 .finish()
64 .map_err(|e| Error::Transport(Arc::new(e)))
65 }
66
67 pub fn abort(&mut self, err: &Error) {
68 self.stream.as_mut().unwrap().reset(err.to_code());
69 }
70
71 pub async fn closed(&mut self) -> Result<(), Error> {
72 self.stream
73 .as_mut()
74 .unwrap()
75 .closed()
76 .await
77 .map_err(|e| Error::Transport(Arc::new(e)))?;
78 Ok(())
79 }
80
81 pub fn set_priority(&mut self, priority: u8) {
82 self.stream.as_mut().unwrap().set_priority(priority);
83 }
84
85 pub fn with_version<O>(mut self, version: O) -> Writer<S, O> {
86 Writer {
87 stream: self.stream.take(),
89 buffer: std::mem::take(&mut self.buffer),
90 version,
91 }
92 }
93}
94
95impl<S: web_transport_trait::SendStream, V> Drop for Writer<S, V> {
96 fn drop(&mut self) {
97 if let Some(mut stream) = self.stream.take() {
98 stream.reset(Error::Cancel.to_code());
100 }
101 }
102}