1#![deny(missing_docs)]
16
17use std::{
18 convert::TryInto,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{self, Poll},
23};
24
25use bytes::{Buf, Bytes};
26use futures::{
27 Stream, StreamExt, ready,
28 stream::{self},
29};
30use h3::{
31 error::Code,
32 quic::{self, ConnectionErrorIncoming, StreamErrorIncoming, StreamId, WriteBuf},
33};
34pub use iroh::endpoint::{AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt};
35use iroh::endpoint::{ConnectionError, ReadError, WriteError};
36use tokio_util::sync::ReusableBoxFuture;
37
38type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Sync + Send + 'a>>;
40
41pub struct Connection {
47 conn: iroh::endpoint::Connection,
48 incoming_bi: BoxStreamSync<'static, <AcceptBi<'static> as Future>::Output>,
49 opening_bi: Option<BoxStreamSync<'static, <OpenBi<'static> as Future>::Output>>,
50 incoming_uni: BoxStreamSync<'static, <AcceptUni<'static> as Future>::Output>,
51 opening_uni: Option<BoxStreamSync<'static, <OpenUni<'static> as Future>::Output>>,
52}
53
54impl Connection {
55 pub fn new(conn: iroh::endpoint::Connection) -> Self {
64 Self {
65 conn: conn.clone(),
66 incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async {
67 Some((conn.accept_bi().await, conn))
68 })),
69 opening_bi: None,
70 incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async {
71 Some((conn.accept_uni().await, conn))
72 })),
73 opening_uni: None,
74 }
75 }
76}
77
78impl<B> quic::Connection<B> for Connection
79where
80 B: Buf,
81{
82 type RecvStream = RecvStream;
83 type OpenStreams = OpenStreams;
84
85 fn poll_accept_bidi(
90 &mut self,
91 cx: &mut task::Context<'_>,
92 ) -> Poll<Result<Self::BidiStream, ConnectionErrorIncoming>> {
93 let (send, recv) = ready!(self.incoming_bi.poll_next_unpin(cx))
94 .expect("self.incoming_bi BoxStream never returns None")
95 .map_err(convert_connection_error)?;
96 Poll::Ready(Ok(Self::BidiStream {
97 send: Self::SendStream::new(send),
98 recv: Self::RecvStream::new(recv),
99 }))
100 }
101
102 fn poll_accept_recv(
106 &mut self,
107 cx: &mut task::Context<'_>,
108 ) -> Poll<Result<Self::RecvStream, ConnectionErrorIncoming>> {
109 let recv = ready!(self.incoming_uni.poll_next_unpin(cx))
110 .expect("self.incoming_uni BoxStream never returns None")
111 .map_err(convert_connection_error)?;
112 Poll::Ready(Ok(Self::RecvStream::new(recv)))
113 }
114
115 fn opener(&self) -> Self::OpenStreams {
117 OpenStreams {
118 conn: self.conn.clone(),
119 opening_bi: None,
120 opening_uni: None,
121 }
122 }
123}
124
125fn convert_connection_error(e: ConnectionError) -> h3::quic::ConnectionErrorIncoming {
127 match e {
128 ConnectionError::ApplicationClosed(application_close) => {
129 ConnectionErrorIncoming::ApplicationClose {
130 error_code: application_close.error_code.into(),
131 }
132 }
133 ConnectionError::TimedOut => ConnectionErrorIncoming::Timeout,
134 error @ ConnectionError::VersionMismatch
135 | error @ ConnectionError::Reset
136 | error @ ConnectionError::LocallyClosed
137 | error @ ConnectionError::CidsExhausted
138 | error @ ConnectionError::TransportError(_)
139 | error @ ConnectionError::ConnectionClosed(_) => {
140 ConnectionErrorIncoming::Undefined(Arc::new(error))
141 }
142 }
143}
144
145impl<B> quic::OpenStreams<B> for Connection
146where
147 B: Buf,
148{
149 type SendStream = SendStream<B>;
150 type BidiStream = BidiStream<B>;
151
152 fn poll_open_bidi(
156 &mut self,
157 cx: &mut task::Context<'_>,
158 ) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
159 let bi = self.opening_bi.get_or_insert_with(|| {
160 Box::pin(stream::unfold(self.conn.clone(), |conn| async {
161 Some((conn.open_bi().await, conn))
162 }))
163 });
164 let (send, recv) = ready!(bi.poll_next_unpin(cx))
165 .expect("BoxStream does not return None")
166 .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
167 connection_error: convert_connection_error(e),
168 })?;
169 Poll::Ready(Ok(Self::BidiStream {
170 send: Self::SendStream::new(send),
171 recv: RecvStream::new(recv),
172 }))
173 }
174
175 fn poll_open_send(
179 &mut self,
180 cx: &mut task::Context<'_>,
181 ) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
182 let uni = self.opening_uni.get_or_insert_with(|| {
183 Box::pin(stream::unfold(self.conn.clone(), |conn| async {
184 Some((conn.open_uni().await, conn))
185 }))
186 });
187
188 let send = ready!(uni.poll_next_unpin(cx))
189 .expect("BoxStream does not return None")
190 .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
191 connection_error: convert_connection_error(e),
192 })?;
193 Poll::Ready(Ok(Self::SendStream::new(send)))
194 }
195
196 fn close(&mut self, code: Code, reason: &[u8]) {
198 self.conn.close(
199 VarInt::from_u64(code.value()).expect("error code VarInt"),
200 reason,
201 );
202 }
203}
204
205pub struct OpenStreams {
209 conn: iroh::endpoint::Connection,
210 opening_bi: Option<BoxStreamSync<'static, <OpenBi<'static> as Future>::Output>>,
211 opening_uni: Option<BoxStreamSync<'static, <OpenUni<'static> as Future>::Output>>,
212}
213
214impl<B> quic::OpenStreams<B> for OpenStreams
215where
216 B: Buf,
217{
218 type SendStream = SendStream<B>;
219 type BidiStream = BidiStream<B>;
220
221 fn poll_open_bidi(
225 &mut self,
226 cx: &mut task::Context<'_>,
227 ) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
228 let bi = self.opening_bi.get_or_insert_with(|| {
229 Box::pin(stream::unfold(self.conn.clone(), |conn| async {
230 Some((conn.open_bi().await, conn))
231 }))
232 });
233
234 let (send, recv) = ready!(bi.poll_next_unpin(cx))
235 .expect("BoxStream does not return None")
236 .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
237 connection_error: convert_connection_error(e),
238 })?;
239 Poll::Ready(Ok(Self::BidiStream {
240 send: Self::SendStream::new(send),
241 recv: RecvStream::new(recv),
242 }))
243 }
244
245 fn poll_open_send(
249 &mut self,
250 cx: &mut task::Context<'_>,
251 ) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
252 let uni = self.opening_uni.get_or_insert_with(|| {
253 Box::pin(stream::unfold(self.conn.clone(), |conn| async {
254 Some((conn.open_uni().await, conn))
255 }))
256 });
257
258 let send = ready!(uni.poll_next_unpin(cx))
259 .expect("BoxStream does not return None")
260 .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
261 connection_error: convert_connection_error(e),
262 })?;
263 Poll::Ready(Ok(Self::SendStream::new(send)))
264 }
265
266 fn close(&mut self, code: Code, reason: &[u8]) {
268 self.conn.close(
269 VarInt::from_u64(code.value()).expect("error code VarInt"),
270 reason,
271 );
272 }
273}
274
275impl Clone for OpenStreams {
277 fn clone(&self) -> Self {
278 Self {
279 conn: self.conn.clone(),
280 opening_bi: None,
281 opening_uni: None,
282 }
283 }
284}
285
286pub struct BidiStream<B>
291where
292 B: Buf,
293{
294 send: SendStream<B>,
295 recv: RecvStream,
296}
297
298impl<B> quic::BidiStream<B> for BidiStream<B>
299where
300 B: Buf,
301{
302 type SendStream = SendStream<B>;
303 type RecvStream = RecvStream;
304
305 fn split(self) -> (Self::SendStream, Self::RecvStream) {
310 (self.send, self.recv)
311 }
312}
313
314impl<B: Buf> quic::RecvStream for BidiStream<B> {
315 type Buf = Bytes;
316
317 fn poll_data(
323 &mut self,
324 cx: &mut task::Context<'_>,
325 ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
326 self.recv.poll_data(cx)
327 }
328
329 fn stop_sending(&mut self, error_code: u64) {
331 self.recv.stop_sending(error_code)
332 }
333
334 fn recv_id(&self) -> StreamId {
336 self.recv.recv_id()
337 }
338}
339
340impl<B> quic::SendStream<B> for BidiStream<B>
341where
342 B: Buf,
343{
344 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
346 self.send.poll_ready(cx)
347 }
348
349 fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
351 self.send.poll_finish(cx)
352 }
353
354 fn reset(&mut self, reset_code: u64) {
356 self.send.reset(reset_code)
357 }
358
359 fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), StreamErrorIncoming> {
361 self.send.send_data(data)
362 }
363
364 fn send_id(&self) -> StreamId {
366 self.send.send_id()
367 }
368}
369
370impl<B> quic::SendStreamUnframed<B> for BidiStream<B>
371where
372 B: Buf,
373{
374 fn poll_send<D: Buf>(
378 &mut self,
379 cx: &mut task::Context<'_>,
380 buf: &mut D,
381 ) -> Poll<Result<usize, StreamErrorIncoming>> {
382 self.send.poll_send(cx, buf)
383 }
384}
385
386pub struct RecvStream {
391 stream: Option<iroh::endpoint::RecvStream>,
392 read_chunk_fut: ReadChunkFuture,
393}
394
395type ReadChunkFuture = ReusableBoxFuture<
397 'static,
398 (
399 iroh::endpoint::RecvStream,
400 Result<Option<iroh::endpoint::Chunk>, iroh::endpoint::ReadError>,
401 ),
402>;
403
404impl RecvStream {
405 fn new(stream: iroh::endpoint::RecvStream) -> Self {
407 Self {
408 stream: Some(stream),
409 read_chunk_fut: ReusableBoxFuture::new(async { unreachable!() }),
410 }
411 }
412}
413
414impl quic::RecvStream for RecvStream {
415 type Buf = Bytes;
416
417 fn poll_data(
424 &mut self,
425 cx: &mut task::Context<'_>,
426 ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
427 if let Some(mut stream) = self.stream.take() {
428 self.read_chunk_fut.set(async move {
429 let chunk = stream.read_chunk(usize::MAX).await;
430 (stream, chunk)
431 })
432 };
433
434 let (stream, chunk) = ready!(self.read_chunk_fut.poll(cx));
435 self.stream = Some(stream);
436 Poll::Ready(Ok(chunk
437 .map_err(convert_read_error_to_stream_error)?
438 .map(|c| c.bytes)))
439 }
440
441 fn stop_sending(&mut self, error_code: u64) {
443 self.stream
444 .as_mut()
445 .unwrap()
446 .stop(VarInt::from_u64(error_code).expect("invalid error_code"))
447 .ok();
448 }
449
450 fn recv_id(&self) -> StreamId {
452 let num: u64 = self.stream.as_ref().unwrap().id().into();
453 num.try_into().expect("invalid stream id")
454 }
455}
456
457fn convert_read_error_to_stream_error(error: ReadError) -> StreamErrorIncoming {
459 match error {
460 ReadError::Reset(var_int) => StreamErrorIncoming::StreamTerminated {
461 error_code: var_int.into_inner(),
462 },
463 ReadError::ConnectionLost(connection_error) => {
464 StreamErrorIncoming::ConnectionErrorIncoming {
465 connection_error: convert_connection_error(connection_error),
466 }
467 }
468 error @ ReadError::ClosedStream => StreamErrorIncoming::Unknown(Box::new(error)),
469 error @ ReadError::ZeroRttRejected => StreamErrorIncoming::Unknown(Box::new(error)),
470 }
471}
472
473fn convert_write_error_to_stream_error(error: WriteError) -> StreamErrorIncoming {
475 match error {
476 WriteError::Stopped(var_int) => StreamErrorIncoming::StreamTerminated {
477 error_code: var_int.into_inner(),
478 },
479 WriteError::ConnectionLost(connection_error) => {
480 StreamErrorIncoming::ConnectionErrorIncoming {
481 connection_error: convert_connection_error(connection_error),
482 }
483 }
484 error @ WriteError::ClosedStream | error @ WriteError::ZeroRttRejected => {
485 StreamErrorIncoming::Unknown(Box::new(error))
486 }
487 }
488}
489
490pub struct SendStream<B: Buf> {
495 stream: iroh::endpoint::SendStream,
496 writing: Option<WriteBuf<B>>,
497}
498
499impl<B> SendStream<B>
500where
501 B: Buf,
502{
503 fn new(stream: iroh::endpoint::SendStream) -> SendStream<B> {
505 Self {
506 stream,
507 writing: None,
508 }
509 }
510}
511
512impl<B> quic::SendStream<B> for SendStream<B>
513where
514 B: Buf,
515{
516 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
520 if let Some(ref mut data) = self.writing {
521 while data.has_remaining() {
522 let stream = Pin::new(&mut self.stream);
523 let written = ready!(stream.poll_write(cx, data.chunk()))
524 .map_err(convert_write_error_to_stream_error)?;
525 data.advance(written);
526 }
527 }
528 self.writing = None;
529 Poll::Ready(Ok(()))
530 }
531
532 fn poll_finish(
534 &mut self,
535 _cx: &mut task::Context<'_>,
536 ) -> Poll<Result<(), StreamErrorIncoming>> {
537 Poll::Ready(
538 self.stream
539 .finish()
540 .map_err(|e| StreamErrorIncoming::Unknown(Box::new(e))),
541 )
542 }
543
544 fn reset(&mut self, reset_code: u64) {
546 let _ = self
547 .stream
548 .reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
549 }
550
551 fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), StreamErrorIncoming> {
555 if self.writing.is_some() {
556 return Err(StreamErrorIncoming::ConnectionErrorIncoming {
557 connection_error: ConnectionErrorIncoming::InternalError(
558 "internal error in the http stack".to_string(),
559 ),
560 });
561 }
562 self.writing = Some(data.into());
563 Ok(())
564 }
565
566 fn send_id(&self) -> StreamId {
568 let num: u64 = self.stream.id().into();
569 num.try_into().expect("invalid stream id")
570 }
571}
572
573impl<B> quic::SendStreamUnframed<B> for SendStream<B>
574where
575 B: Buf,
576{
577 fn poll_send<D: Buf>(
581 &mut self,
582 cx: &mut task::Context<'_>,
583 buf: &mut D,
584 ) -> Poll<Result<usize, StreamErrorIncoming>> {
585 if self.writing.is_some() {
586 panic!("poll_send called while send stream is not ready");
587 }
588
589 let s = Pin::new(&mut self.stream);
590
591 let res = ready!(s.poll_write(cx, buf.chunk()));
592 match res {
593 Ok(written) => {
594 buf.advance(written);
595 Poll::Ready(Ok(written))
596 }
597 Err(err) => Poll::Ready(Err(convert_write_error_to_stream_error(err))),
598 }
599 }
600}