web_transport_quinn/
send.rs1use std::{
2 io,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8
9use crate::{ClosedStream, SessionError, WriteError};
10
11#[derive(Debug)]
16pub struct SendStream {
17 stream: quinn::SendStream,
18}
19
20impl SendStream {
21 pub(crate) fn new(stream: quinn::SendStream) -> Self {
22 Self { stream }
23 }
24
25 pub fn reset(&mut self, code: u32) -> Result<(), ClosedStream> {
28 let code = web_transport_proto::error_to_http3(code);
29 let code = quinn::VarInt::try_from(code).unwrap();
30 self.stream.reset(code).map_err(Into::into)
31 }
32
33 pub async fn stopped(&mut self) -> Result<Option<u32>, SessionError> {
38 match self.stream.stopped().await {
39 Ok(Some(code)) => Ok(web_transport_proto::error_from_http3(code.into_inner())),
40 Ok(None) => Ok(None),
41 Err(quinn::StoppedError::ConnectionLost(e)) => Err(e.into()),
42 Err(quinn::StoppedError::ZeroRttRejected) => unreachable!("0-RTT not supported"),
43 }
44 }
45
46 pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
50 self.stream.write(buf).await.map_err(Into::into)
51 }
52
53 pub async fn write_all(&mut self, buf: &[u8]) -> Result<(), WriteError> {
55 self.stream.write_all(buf).await.map_err(Into::into)
56 }
57
58 pub async fn write_chunks(
60 &mut self,
61 bufs: &mut [Bytes],
62 ) -> Result<quinn_proto::Written, WriteError> {
63 self.stream.write_chunks(bufs).await.map_err(Into::into)
64 }
65
66 pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
68 self.stream.write_chunk(buf).await.map_err(Into::into)
69 }
70
71 pub async fn write_all_chunks(&mut self, bufs: &mut [Bytes]) -> Result<(), WriteError> {
73 self.stream.write_all_chunks(bufs).await.map_err(Into::into)
74 }
75
76 pub fn finish(&mut self) -> Result<(), ClosedStream> {
78 self.stream.finish().map_err(Into::into)
79 }
80
81 pub fn set_priority(&self, order: i32) -> Result<(), ClosedStream> {
82 self.stream.set_priority(order).map_err(Into::into)
83 }
84
85 pub fn priority(&self) -> Result<i32, ClosedStream> {
86 self.stream.priority().map_err(Into::into)
87 }
88}
89
90impl tokio::io::AsyncWrite for SendStream {
91 fn poll_write(
92 mut self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 buf: &[u8],
95 ) -> Poll<io::Result<usize>> {
96 tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf)
98 }
99
100 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
101 Pin::new(&mut self.stream).poll_flush(cx)
102 }
103
104 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
105 Pin::new(&mut self.stream).poll_shutdown(cx)
106 }
107}