quic_reverse_transport/
quinn_adapter.rs1use crate::{Connection, RecvStream, SendStream};
21use std::io;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
26
27#[derive(Clone)]
29pub struct QuinnConnection {
30 inner: quinn::Connection,
31}
32
33impl QuinnConnection {
34 #[must_use]
36 pub const fn new(connection: quinn::Connection) -> Self {
37 Self { inner: connection }
38 }
39
40 #[must_use]
42 pub const fn inner(&self) -> &quinn::Connection {
43 &self.inner
44 }
45
46 #[must_use]
48 pub fn into_inner(self) -> quinn::Connection {
49 self.inner
50 }
51}
52
53impl From<quinn::Connection> for QuinnConnection {
54 fn from(connection: quinn::Connection) -> Self {
55 Self::new(connection)
56 }
57}
58
59impl Connection for QuinnConnection {
60 type SendStream = QuinnSendStream;
61 type RecvStream = QuinnRecvStream;
62 type OpenError = quinn::ConnectionError;
63 type AcceptError = quinn::ConnectionError;
64
65 async fn open_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::OpenError> {
66 let (send, recv) = self.inner.open_bi().await?;
67 Ok((QuinnSendStream::new(send), QuinnRecvStream::new(recv)))
68 }
69
70 async fn accept_bi(
71 &self,
72 ) -> Result<Option<(Self::SendStream, Self::RecvStream)>, Self::AcceptError> {
73 match self.inner.accept_bi().await {
74 Ok((send, recv)) => Ok(Some((
75 QuinnSendStream::new(send),
76 QuinnRecvStream::new(recv),
77 ))),
78 Err(
79 quinn::ConnectionError::ApplicationClosed(_)
80 | quinn::ConnectionError::LocallyClosed,
81 ) => Ok(None),
82 Err(e) => Err(e),
83 }
84 }
85
86 fn close(&self, code: u32, reason: &[u8]) {
87 self.inner.close(quinn::VarInt::from_u32(code), reason);
88 }
89
90 fn is_open(&self) -> bool {
91 self.inner.close_reason().is_none()
92 }
93}
94
95pub struct QuinnSendStream {
97 inner: quinn::SendStream,
98}
99
100impl QuinnSendStream {
101 #[must_use]
103 pub const fn new(stream: quinn::SendStream) -> Self {
104 Self { inner: stream }
105 }
106
107 #[must_use]
109 pub const fn inner(&self) -> &quinn::SendStream {
110 &self.inner
111 }
112
113 pub fn inner_mut(&mut self) -> &mut quinn::SendStream {
115 &mut self.inner
116 }
117}
118
119impl AsyncWrite for QuinnSendStream {
120 fn poll_write(
121 mut self: Pin<&mut Self>,
122 cx: &mut Context<'_>,
123 buf: &[u8],
124 ) -> Poll<io::Result<usize>> {
125 <quinn::SendStream as AsyncWrite>::poll_write(Pin::new(&mut self.inner), cx, buf)
127 }
128
129 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
130 <quinn::SendStream as AsyncWrite>::poll_flush(Pin::new(&mut self.inner), cx)
131 }
132
133 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
134 <quinn::SendStream as AsyncWrite>::poll_shutdown(Pin::new(&mut self.inner), cx)
135 }
136}
137
138impl SendStream for QuinnSendStream {
139 type FinishError = quinn::ClosedStream;
140
141 async fn finish(&mut self) -> Result<(), Self::FinishError> {
142 self.inner.finish()
143 }
144
145 fn reset(&mut self, code: u32) {
146 let _ = self.inner.reset(quinn::VarInt::from_u32(code));
147 }
148}
149
150pub struct QuinnRecvStream {
152 inner: quinn::RecvStream,
153}
154
155impl QuinnRecvStream {
156 #[must_use]
158 pub const fn new(stream: quinn::RecvStream) -> Self {
159 Self { inner: stream }
160 }
161
162 #[must_use]
164 pub const fn inner(&self) -> &quinn::RecvStream {
165 &self.inner
166 }
167
168 pub fn inner_mut(&mut self) -> &mut quinn::RecvStream {
170 &mut self.inner
171 }
172}
173
174impl AsyncRead for QuinnRecvStream {
175 fn poll_read(
176 mut self: Pin<&mut Self>,
177 cx: &mut Context<'_>,
178 buf: &mut ReadBuf<'_>,
179 ) -> Poll<io::Result<()>> {
180 <quinn::RecvStream as AsyncRead>::poll_read(Pin::new(&mut self.inner), cx, buf)
181 }
182}
183
184impl RecvStream for QuinnRecvStream {
185 fn stop(&mut self, code: u32) {
186 let _ = self.inner.stop(quinn::VarInt::from_u32(code));
187 }
188}
189
190#[derive(Clone)]
195pub struct SharedQuinnConnection {
196 inner: Arc<QuinnConnection>,
197}
198
199impl SharedQuinnConnection {
200 #[must_use]
202 pub fn new(connection: quinn::Connection) -> Self {
203 Self {
204 inner: Arc::new(QuinnConnection::new(connection)),
205 }
206 }
207}
208
209impl Connection for SharedQuinnConnection {
210 type SendStream = QuinnSendStream;
211 type RecvStream = QuinnRecvStream;
212 type OpenError = quinn::ConnectionError;
213 type AcceptError = quinn::ConnectionError;
214
215 async fn open_bi(&self) -> Result<(Self::SendStream, Self::RecvStream), Self::OpenError> {
216 self.inner.open_bi().await
217 }
218
219 async fn accept_bi(
220 &self,
221 ) -> Result<Option<(Self::SendStream, Self::RecvStream)>, Self::AcceptError> {
222 self.inner.accept_bi().await
223 }
224
225 fn close(&self, code: u32, reason: &[u8]) {
226 self.inner.close(code, reason);
227 }
228
229 fn is_open(&self) -> bool {
230 self.inner.is_open()
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn quinn_connection_is_send_sync() {
240 fn assert_send_sync<T: Send + Sync>() {}
241 assert_send_sync::<QuinnConnection>();
242 assert_send_sync::<SharedQuinnConnection>();
243 }
244
245 #[test]
246 fn quinn_streams_are_send() {
247 fn assert_send<T: Send>() {}
248 assert_send::<QuinnSendStream>();
249 assert_send::<QuinnRecvStream>();
250 }
251}