moq_lite/coding/
writer.rs1use std::{fmt::Debug, sync::Arc};
2
3use crate::{Error, coding::*};
4
5pub struct Writer<S: web_transport_trait::SendStream, V> {
7 stream: Option<S>,
8 buffer: bytes::BytesMut,
9 version: V,
10}
11
12impl<S: web_transport_trait::SendStream, V> Writer<S, V> {
13 pub fn new(stream: S, version: V) -> Self {
15 Self {
16 stream: Some(stream),
17 buffer: Default::default(),
18 version,
19 }
20 }
21
22 pub async fn encode<T: Encode<V> + Debug>(&mut self, msg: &T) -> Result<(), Error>
24 where
25 V: Clone,
26 {
27 self.buffer.clear();
28 msg.encode(&mut self.buffer, self.version.clone());
29
30 while !self.buffer.is_empty() {
31 self.stream
32 .as_mut()
33 .unwrap()
34 .write_buf(&mut self.buffer)
35 .await
36 .map_err(|e| Error::Transport(Arc::new(e)))?;
37 }
38
39 Ok(())
40 }
41
42 async fn write<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<usize, Error> {
44 self.stream
45 .as_mut()
46 .unwrap()
47 .write_buf(buf)
48 .await
49 .map_err(|e| Error::Transport(Arc::new(e)))
50 }
51
52 pub async fn write_all<Buf: bytes::Buf + Send>(&mut self, buf: &mut Buf) -> Result<(), Error> {
56 while buf.has_remaining() {
57 self.write(buf).await?;
58 }
59 Ok(())
60 }
61
62 pub fn finish(&mut self) -> Result<(), Error> {
64 self.stream
65 .as_mut()
66 .unwrap()
67 .finish()
68 .map_err(|e| Error::Transport(Arc::new(e)))
69 }
70
71 pub fn abort(&mut self, err: &Error) {
73 self.stream.as_mut().unwrap().reset(err.to_code());
74 }
75
76 pub async fn closed(&mut self) -> Result<(), Error> {
78 self.stream
79 .as_mut()
80 .unwrap()
81 .closed()
82 .await
83 .map_err(|e| Error::Transport(Arc::new(e)))?;
84 Ok(())
85 }
86
87 pub fn set_priority(&mut self, priority: u8) {
89 self.stream.as_mut().unwrap().set_priority(priority);
90 }
91
92 pub fn with_version<O>(mut self, version: O) -> Writer<S, O> {
94 Writer {
95 stream: self.stream.take(),
97 buffer: std::mem::take(&mut self.buffer),
98 version,
99 }
100 }
101}
102
103impl<S: web_transport_trait::SendStream, V> Drop for Writer<S, V> {
104 fn drop(&mut self) {
105 if let Some(mut stream) = self.stream.take() {
106 stream.reset(Error::Cancel.to_code());
108 }
109 }
110}