1use bytes::{Buf, BufMut, Bytes};
2use url::Url;
3
4pub use web_transport_quinn as quinn;
6
7pub use web_transport_quinn::CongestionControl;
8
9#[derive(Default, Clone)]
11pub struct ClientBuilder {
12 inner: quinn::ClientBuilder,
13}
14
15impl ClientBuilder {
16 pub fn new() -> Self {
17 Self::default()
18 }
19
20 pub fn with_congestion_control(self, cc: CongestionControl) -> Self {
22 Self {
23 inner: self.inner.with_congestion_control(cc),
24 }
25 }
26
27 pub fn with_server_certificate_hashes(self, hashes: Vec<Vec<u8>>) -> Result<Client, Error> {
29 Ok(Client {
30 inner: self.inner.with_server_certificate_hashes(hashes)?,
31 })
32 }
33
34 pub fn with_system_roots(self) -> Result<Client, Error> {
36 Ok(Client {
37 inner: self.inner.with_system_roots()?,
38 })
39 }
40}
41
42#[derive(Clone, Debug)]
44pub struct Client {
45 inner: quinn::Client,
46}
47
48impl Client {
49 pub async fn connect(&self, url: Url) -> Result<Session, Error> {
51 Ok(self.inner.connect(url).await?.into())
52 }
53}
54
55pub struct Server {
62 inner: quinn::Server,
63}
64
65impl From<quinn::Server> for Server {
66 fn from(server: quinn::Server) -> Self {
67 Self { inner: server }
68 }
69}
70
71impl Server {
72 pub async fn accept(&mut self) -> Result<Option<Session>, Error> {
74 match self.inner.accept().await {
75 Some(session) => Ok(Some(session.ok().await?.into())),
77 None => Ok(None),
78 }
79 }
80}
81
82#[derive(Clone, PartialEq, Eq)]
87pub struct Session {
88 inner: quinn::Session,
89}
90
91impl Session {
92 pub async fn accept_uni(&self) -> Result<RecvStream, Error> {
96 let stream = self.inner.accept_uni().await?;
97 Ok(RecvStream::new(stream))
98 }
99
100 pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), Error> {
102 let (s, r) = self.inner.accept_bi().await?;
103 Ok((SendStream::new(s), RecvStream::new(r)))
104 }
105
106 pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), Error> {
108 Ok(self
109 .inner
110 .open_bi()
111 .await
112 .map(|(s, r)| (SendStream::new(s), RecvStream::new(r)))?)
113 }
114
115 pub async fn open_uni(&self) -> Result<SendStream, Error> {
117 Ok(self.inner.open_uni().await.map(SendStream::new)?)
118 }
119
120 pub async fn send_datagram(&self, payload: Bytes) -> Result<(), Error> {
130 Ok(self.inner.send_datagram(payload)?)
132 }
133
134 pub async fn max_datagram_size(&self) -> usize {
136 self.inner.max_datagram_size()
137 }
138
139 pub async fn recv_datagram(&self) -> Result<Bytes, Error> {
141 Ok(self.inner.read_datagram().await?)
142 }
143
144 pub fn close(&self, code: u32, reason: &str) {
146 self.inner.close(code, reason.as_bytes())
147 }
148
149 pub async fn closed(&self) -> Error {
151 self.inner.closed().await.into()
152 }
153
154 pub fn url(&self) -> &Url {
156 &self.inner.request().url
157 }
158
159 pub fn protocol(&self) -> Option<&str> {
161 self.inner.response().protocol.as_deref()
162 }
163}
164
165impl From<quinn::Session> for Session {
167 fn from(session: quinn::Session) -> Self {
168 Session { inner: session }
169 }
170}
171
172pub struct SendStream {
177 inner: quinn::SendStream,
178}
179
180impl SendStream {
181 fn new(inner: quinn::SendStream) -> Self {
182 Self { inner }
183 }
184
185 #[must_use = "returns the number of bytes written"]
187 pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
188 self.inner.write(buf).await.map_err(Into::into)
189 }
190
191 pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, Error> {
193 let size = buf.chunk().len();
195 let chunk = buf.copy_to_bytes(size);
196 self.inner.write_chunk(chunk).await?;
197 Ok(size)
198 }
199
200 pub fn set_priority(&mut self, order: i32) {
204 self.inner.set_priority(order).ok();
205 }
206
207 pub fn reset(&mut self, code: u32) {
209 self.inner.reset(code).ok();
210 }
211
212 pub fn finish(&mut self) -> Result<(), Error> {
216 self.inner
217 .finish()
218 .map_err(|_| Error::Write(quinn::WriteError::ClosedStream))?;
219 Ok(())
220 }
221
222 pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
227 match self.inner.stopped().await {
228 Ok(None) => Ok(None),
229 Ok(Some(code)) => Ok(Some(code as u8)),
230 Err(e) => Err(Error::Session(e)),
231 }
232 }
233}
234
235pub struct RecvStream {
240 inner: quinn::RecvStream,
241}
242
243impl RecvStream {
244 fn new(inner: quinn::RecvStream) -> Self {
245 Self { inner }
246 }
247
248 pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> {
252 Ok(self
253 .inner
254 .read_chunk(max, true)
255 .await?
256 .map(|chunk| chunk.bytes))
257 }
258
259 pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Error> {
264 let dst = buf.chunk_mut();
265 let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) };
266
267 let size = match self.inner.read(dst).await? {
268 Some(size) if size > 0 => size,
269 _ => return Ok(None),
270 };
271
272 unsafe { buf.advance_mut(size) };
273
274 Ok(Some(size))
275 }
276
277 pub fn stop(&mut self, code: u32) {
279 self.inner.stop(code).ok();
280 }
281
282 pub async fn closed(&mut self) -> Result<Option<u8>, Error> {
288 match self.inner.received_reset().await {
289 Ok(None) => Ok(None),
290 Ok(Some(code)) => Ok(Some(code as u8)),
291 Err(e) => Err(Error::Session(e)),
292 }
293 }
294}
295
296#[derive(Debug, thiserror::Error, Clone)]
301pub enum Error {
302 #[error("session error: {0}")]
303 Session(#[from] quinn::SessionError),
304
305 #[error("server error: {0}")]
306 Server(#[from] quinn::ServerError),
307
308 #[error("client error: {0}")]
309 Client(#[from] quinn::ClientError),
310
311 #[error("write error: {0}")]
312 Write(quinn::WriteError),
313
314 #[error("read error: {0}")]
315 Read(quinn::ReadError),
316}
317
318impl From<quinn::WriteError> for Error {
319 fn from(e: quinn::WriteError) -> Self {
320 match e {
321 quinn::WriteError::SessionError(e) => Error::Session(e),
322 e => Error::Write(e),
323 }
324 }
325}
326impl From<quinn::ReadError> for Error {
327 fn from(e: quinn::ReadError) -> Self {
328 match e {
329 quinn::ReadError::SessionError(e) => Error::Session(e),
330 e => Error::Read(e),
331 }
332 }
333}