Skip to main content

iroh_h3/
lib.rs

1//! # iroh-h3: HTTP/3 support over iroh P2P connections
2//!
3//! `iroh-h3` provides low-level integration for running HTTP/3 over
4//! [`iroh`](https://docs.rs/iroh) peer-to-peer QUIC connections.
5//! It implements the traits required by the `h3` crate on top of `iroh`
6//! connections and streams. This crate is intended for internal use in building
7//! HTTP/3 over P2P layers.
8//!
9//! # License
10//!
11//! This crate is MIT licensed. Portions of the code are derived from
12//! [`hyperium/h3`](https://github.com/hyperium/h3) and are reproduced under
13//! the original MIT license terms.
14
15#![deny(missing_docs)]
16
17use std::{
18    convert::TryInto,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{self, Poll},
23};
24
25use bytes::{Buf, Bytes};
26use futures::{
27    Stream, StreamExt, ready,
28    stream::{self},
29};
30use h3::{
31    error::Code,
32    quic::{self, ConnectionErrorIncoming, StreamErrorIncoming, StreamId, WriteBuf},
33};
34pub use iroh::endpoint::{AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt};
35use iroh::endpoint::{ConnectionError, ReadError, WriteError};
36use tokio_util::sync::ReusableBoxFuture;
37
38/// BoxStream type alias with `Sync` and `Send` requirements.
39type BoxStreamSync<'a, T> = Pin<Box<dyn Stream<Item = T> + Sync + Send + 'a>>;
40
41/// A wrapper around an [`iroh::endpoint::Connection`] that implements the
42/// [`h3::quic::Connection`] trait for use in HTTP/3 over QUIC.
43///
44/// This struct manages incoming and outgoing unidirectional and bidirectional
45/// streams and handles conversions between `iroh` and `h3` errors.
46pub struct Connection {
47    conn: iroh::endpoint::Connection,
48    incoming_bi: BoxStreamSync<'static, <AcceptBi<'static> as Future>::Output>,
49    opening_bi: Option<BoxStreamSync<'static, <OpenBi<'static> as Future>::Output>>,
50    incoming_uni: BoxStreamSync<'static, <AcceptUni<'static> as Future>::Output>,
51    opening_uni: Option<BoxStreamSync<'static, <OpenUni<'static> as Future>::Output>>,
52}
53
54impl Connection {
55    /// Creates a new [`Connection`] from an existing [`iroh::endpoint::Connection`].
56    ///
57    /// This sets up async streams for accepting incoming unidirectional and
58    /// bidirectional QUIC streams.
59    ///
60    /// # Arguments
61    ///
62    /// * `conn` - The underlying `iroh` connection to wrap.
63    pub fn new(conn: iroh::endpoint::Connection) -> Self {
64        Self {
65            conn: conn.clone(),
66            incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async {
67                Some((conn.accept_bi().await, conn))
68            })),
69            opening_bi: None,
70            incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async {
71                Some((conn.accept_uni().await, conn))
72            })),
73            opening_uni: None,
74        }
75    }
76}
77
78impl<B> quic::Connection<B> for Connection
79where
80    B: Buf,
81{
82    type RecvStream = RecvStream;
83    type OpenStreams = OpenStreams;
84
85    /// Polls for an incoming bidirectional stream (accepts a stream).
86    ///
87    /// Returns a pair of [`SendStream`] and [`RecvStream`] wrapped in
88    /// [`BidiStream`].
89    fn poll_accept_bidi(
90        &mut self,
91        cx: &mut task::Context<'_>,
92    ) -> Poll<Result<Self::BidiStream, ConnectionErrorIncoming>> {
93        let (send, recv) = ready!(self.incoming_bi.poll_next_unpin(cx))
94            .expect("self.incoming_bi BoxStream never returns None")
95            .map_err(convert_connection_error)?;
96        Poll::Ready(Ok(Self::BidiStream {
97            send: Self::SendStream::new(send),
98            recv: Self::RecvStream::new(recv),
99        }))
100    }
101
102    /// Polls for an incoming unidirectional receive stream.
103    ///
104    /// Returns a [`RecvStream`] once available.
105    fn poll_accept_recv(
106        &mut self,
107        cx: &mut task::Context<'_>,
108    ) -> Poll<Result<Self::RecvStream, ConnectionErrorIncoming>> {
109        let recv = ready!(self.incoming_uni.poll_next_unpin(cx))
110            .expect("self.incoming_uni BoxStream never returns None")
111            .map_err(convert_connection_error)?;
112        Poll::Ready(Ok(Self::RecvStream::new(recv)))
113    }
114
115    /// Returns a new [`OpenStreams`] handle for opening outgoing streams.
116    fn opener(&self) -> Self::OpenStreams {
117        OpenStreams {
118            conn: self.conn.clone(),
119            opening_bi: None,
120            opening_uni: None,
121        }
122    }
123}
124
125/// Converts an [`iroh::endpoint::ConnectionError`] to an [`h3::quic::ConnectionErrorIncoming`].
126fn convert_connection_error(e: ConnectionError) -> h3::quic::ConnectionErrorIncoming {
127    match e {
128        ConnectionError::ApplicationClosed(application_close) => {
129            ConnectionErrorIncoming::ApplicationClose {
130                error_code: application_close.error_code.into(),
131            }
132        }
133        ConnectionError::TimedOut => ConnectionErrorIncoming::Timeout,
134        error @ ConnectionError::VersionMismatch
135        | error @ ConnectionError::Reset
136        | error @ ConnectionError::LocallyClosed
137        | error @ ConnectionError::CidsExhausted
138        | error @ ConnectionError::TransportError(_)
139        | error @ ConnectionError::ConnectionClosed(_) => {
140            ConnectionErrorIncoming::Undefined(Arc::new(error))
141        }
142    }
143}
144
145impl<B> quic::OpenStreams<B> for Connection
146where
147    B: Buf,
148{
149    type SendStream = SendStream<B>;
150    type BidiStream = BidiStream<B>;
151
152    /// Attempts to open a new bidirectional stream for sending and receiving.
153    ///
154    /// Returns a [`BidiStream`] once ready, or a [`StreamErrorIncoming`] on failure.
155    fn poll_open_bidi(
156        &mut self,
157        cx: &mut task::Context<'_>,
158    ) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
159        let bi = self.opening_bi.get_or_insert_with(|| {
160            Box::pin(stream::unfold(self.conn.clone(), |conn| async {
161                Some((conn.open_bi().await, conn))
162            }))
163        });
164        let (send, recv) = ready!(bi.poll_next_unpin(cx))
165            .expect("BoxStream does not return None")
166            .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
167                connection_error: convert_connection_error(e),
168            })?;
169        Poll::Ready(Ok(Self::BidiStream {
170            send: Self::SendStream::new(send),
171            recv: RecvStream::new(recv),
172        }))
173    }
174
175    /// Attempts to open a new unidirectional send stream.
176    ///
177    /// Returns a [`SendStream`] once ready.
178    fn poll_open_send(
179        &mut self,
180        cx: &mut task::Context<'_>,
181    ) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
182        let uni = self.opening_uni.get_or_insert_with(|| {
183            Box::pin(stream::unfold(self.conn.clone(), |conn| async {
184                Some((conn.open_uni().await, conn))
185            }))
186        });
187
188        let send = ready!(uni.poll_next_unpin(cx))
189            .expect("BoxStream does not return None")
190            .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
191                connection_error: convert_connection_error(e),
192            })?;
193        Poll::Ready(Ok(Self::SendStream::new(send)))
194    }
195
196    /// Closes the QUIC connection with the provided application error code and reason.
197    fn close(&mut self, code: Code, reason: &[u8]) {
198        self.conn.close(
199            VarInt::from_u64(code.value()).expect("error code VarInt"),
200            reason,
201        );
202    }
203}
204
205/// A handle for opening outgoing QUIC streams.
206///
207/// Implements [`h3::quic::OpenStreams`] for use with HTTP/3.
208pub struct OpenStreams {
209    conn: iroh::endpoint::Connection,
210    opening_bi: Option<BoxStreamSync<'static, <OpenBi<'static> as Future>::Output>>,
211    opening_uni: Option<BoxStreamSync<'static, <OpenUni<'static> as Future>::Output>>,
212}
213
214impl<B> quic::OpenStreams<B> for OpenStreams
215where
216    B: Buf,
217{
218    type SendStream = SendStream<B>;
219    type BidiStream = BidiStream<B>;
220
221    /// Polls for opening a new bidirectional stream.
222    ///
223    /// Returns a [`BidiStream`] on success.
224    fn poll_open_bidi(
225        &mut self,
226        cx: &mut task::Context<'_>,
227    ) -> Poll<Result<Self::BidiStream, StreamErrorIncoming>> {
228        let bi = self.opening_bi.get_or_insert_with(|| {
229            Box::pin(stream::unfold(self.conn.clone(), |conn| async {
230                Some((conn.open_bi().await, conn))
231            }))
232        });
233
234        let (send, recv) = ready!(bi.poll_next_unpin(cx))
235            .expect("BoxStream does not return None")
236            .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
237                connection_error: convert_connection_error(e),
238            })?;
239        Poll::Ready(Ok(Self::BidiStream {
240            send: Self::SendStream::new(send),
241            recv: RecvStream::new(recv),
242        }))
243    }
244
245    /// Polls for opening a new unidirectional send stream.
246    ///
247    /// Returns a [`SendStream`] on success.
248    fn poll_open_send(
249        &mut self,
250        cx: &mut task::Context<'_>,
251    ) -> Poll<Result<Self::SendStream, StreamErrorIncoming>> {
252        let uni = self.opening_uni.get_or_insert_with(|| {
253            Box::pin(stream::unfold(self.conn.clone(), |conn| async {
254                Some((conn.open_uni().await, conn))
255            }))
256        });
257
258        let send = ready!(uni.poll_next_unpin(cx))
259            .expect("BoxStream does not return None")
260            .map_err(|e| StreamErrorIncoming::ConnectionErrorIncoming {
261                connection_error: convert_connection_error(e),
262            })?;
263        Poll::Ready(Ok(Self::SendStream::new(send)))
264    }
265
266    /// Closes the underlying connection with the given error code and reason.
267    fn close(&mut self, code: Code, reason: &[u8]) {
268        self.conn.close(
269            VarInt::from_u64(code.value()).expect("error code VarInt"),
270            reason,
271        );
272    }
273}
274
275/// Implements [`Clone`] for [`OpenStreams`].
276impl Clone for OpenStreams {
277    fn clone(&self) -> Self {
278        Self {
279            conn: self.conn.clone(),
280            opening_bi: None,
281            opening_uni: None,
282        }
283    }
284}
285
286/// A bidirectional QUIC stream that contains both send and receive halves.
287///
288/// This struct implements both [`h3::quic::BidiStream`], [`h3::quic::RecvStream`],
289/// and [`h3::quic::SendStream`] traits, allowing it to be split or used directly.
290pub struct BidiStream<B>
291where
292    B: Buf,
293{
294    send: SendStream<B>,
295    recv: RecvStream,
296}
297
298impl<B> quic::BidiStream<B> for BidiStream<B>
299where
300    B: Buf,
301{
302    type SendStream = SendStream<B>;
303    type RecvStream = RecvStream;
304
305    /// Splits the bidirectional stream into its send and receive halves.
306    ///
307    /// # Returns
308    /// A tuple of `(SendStream, RecvStream)`.
309    fn split(self) -> (Self::SendStream, Self::RecvStream) {
310        (self.send, self.recv)
311    }
312}
313
314impl<B: Buf> quic::RecvStream for BidiStream<B> {
315    type Buf = Bytes;
316
317    /// Polls for incoming data on the receive side of the stream.
318    ///
319    /// Returns `Poll::Ready(Ok(Some(Bytes)))` when data is available,
320    /// `Poll::Ready(Ok(None))` when the stream is finished,
321    /// or `Poll::Ready(Err(StreamErrorIncoming))` on error.
322    fn poll_data(
323        &mut self,
324        cx: &mut task::Context<'_>,
325    ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
326        self.recv.poll_data(cx)
327    }
328
329    /// Informs the peer that the receiver is no longer interested in this stream.
330    fn stop_sending(&mut self, error_code: u64) {
331        self.recv.stop_sending(error_code)
332    }
333
334    /// Returns the QUIC stream ID for this receiving stream.
335    fn recv_id(&self) -> StreamId {
336        self.recv.recv_id()
337    }
338}
339
340impl<B> quic::SendStream<B> for BidiStream<B>
341where
342    B: Buf,
343{
344    /// Polls for readiness to send data on the stream.
345    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
346        self.send.poll_ready(cx)
347    }
348
349    /// Polls for completion of the stream’s send side (finishing transmission).
350    fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
351        self.send.poll_finish(cx)
352    }
353
354    /// Resets the send side of the stream with an error code.
355    fn reset(&mut self, reset_code: u64) {
356        self.send.reset(reset_code)
357    }
358
359    /// Queues a buffer of data to be sent on the stream.
360    fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), StreamErrorIncoming> {
361        self.send.send_data(data)
362    }
363
364    /// Returns the QUIC stream ID for this sending stream.
365    fn send_id(&self) -> StreamId {
366        self.send.send_id()
367    }
368}
369
370impl<B> quic::SendStreamUnframed<B> for BidiStream<B>
371where
372    B: Buf,
373{
374    /// Polls to send raw unframed data from the provided buffer.
375    ///
376    /// This variant writes directly from the buffer without framing.
377    fn poll_send<D: Buf>(
378        &mut self,
379        cx: &mut task::Context<'_>,
380        buf: &mut D,
381    ) -> Poll<Result<usize, StreamErrorIncoming>> {
382        self.send.poll_send(cx, buf)
383    }
384}
385
386/// A receiving QUIC stream that reads ordered chunks of data.
387///
388/// Internally wraps an [`iroh::endpoint::RecvStream`] and manages
389/// reusable futures for efficient reading.
390pub struct RecvStream {
391    stream: Option<iroh::endpoint::RecvStream>,
392    read_chunk_fut: ReadChunkFuture,
393}
394
395/// Type alias for a reusable boxed future that reads the next chunk.
396type ReadChunkFuture = ReusableBoxFuture<
397    'static,
398    (
399        iroh::endpoint::RecvStream,
400        Result<Option<iroh::endpoint::Chunk>, iroh::endpoint::ReadError>,
401    ),
402>;
403
404impl RecvStream {
405    /// Creates a new [`RecvStream`] from an [`iroh::endpoint::RecvStream`].
406    fn new(stream: iroh::endpoint::RecvStream) -> Self {
407        Self {
408            stream: Some(stream),
409            read_chunk_fut: ReusableBoxFuture::new(async { unreachable!() }),
410        }
411    }
412}
413
414impl quic::RecvStream for RecvStream {
415    type Buf = Bytes;
416
417    /// Polls for the next chunk of received data.
418    ///
419    /// Returns:
420    /// * `Poll::Ready(Ok(Some(Bytes)))` — when data is available.
421    /// * `Poll::Ready(Ok(None))` — when the stream has finished.
422    /// * `Poll::Ready(Err(StreamErrorIncoming))` — when an error occurs.
423    fn poll_data(
424        &mut self,
425        cx: &mut task::Context<'_>,
426    ) -> Poll<Result<Option<Self::Buf>, StreamErrorIncoming>> {
427        if let Some(mut stream) = self.stream.take() {
428            self.read_chunk_fut.set(async move {
429                let chunk = stream.read_chunk(usize::MAX).await;
430                (stream, chunk)
431            })
432        };
433
434        let (stream, chunk) = ready!(self.read_chunk_fut.poll(cx));
435        self.stream = Some(stream);
436        Poll::Ready(Ok(chunk
437            .map_err(convert_read_error_to_stream_error)?
438            .map(|c| c.bytes)))
439    }
440
441    /// Cancels further reception on this stream with the given error code.
442    fn stop_sending(&mut self, error_code: u64) {
443        self.stream
444            .as_mut()
445            .unwrap()
446            .stop(VarInt::from_u64(error_code).expect("invalid error_code"))
447            .ok();
448    }
449
450    /// Returns the QUIC stream ID associated with this receive stream.
451    fn recv_id(&self) -> StreamId {
452        let num: u64 = self.stream.as_ref().unwrap().id().into();
453        num.try_into().expect("invalid stream id")
454    }
455}
456
457/// Converts an [`iroh::endpoint::ReadError`] into an [`h3::quic::StreamErrorIncoming`].
458fn convert_read_error_to_stream_error(error: ReadError) -> StreamErrorIncoming {
459    match error {
460        ReadError::Reset(var_int) => StreamErrorIncoming::StreamTerminated {
461            error_code: var_int.into_inner(),
462        },
463        ReadError::ConnectionLost(connection_error) => {
464            StreamErrorIncoming::ConnectionErrorIncoming {
465                connection_error: convert_connection_error(connection_error),
466            }
467        }
468        error @ ReadError::ClosedStream => StreamErrorIncoming::Unknown(Box::new(error)),
469        error @ ReadError::ZeroRttRejected => StreamErrorIncoming::Unknown(Box::new(error)),
470    }
471}
472
473/// Converts an [`iroh::endpoint::WriteError`] into an [`h3::quic::StreamErrorIncoming`].
474fn convert_write_error_to_stream_error(error: WriteError) -> StreamErrorIncoming {
475    match error {
476        WriteError::Stopped(var_int) => StreamErrorIncoming::StreamTerminated {
477            error_code: var_int.into_inner(),
478        },
479        WriteError::ConnectionLost(connection_error) => {
480            StreamErrorIncoming::ConnectionErrorIncoming {
481                connection_error: convert_connection_error(connection_error),
482            }
483        }
484        error @ WriteError::ClosedStream | error @ WriteError::ZeroRttRejected => {
485            StreamErrorIncoming::Unknown(Box::new(error))
486        }
487    }
488}
489
490/// A sending QUIC stream that transmits buffered data.
491///
492/// This struct wraps an [`iroh::endpoint::SendStream`] and implements the
493/// [`h3::quic::SendStream`] and [`h3::quic::SendStreamUnframed`] traits.
494pub struct SendStream<B: Buf> {
495    stream: iroh::endpoint::SendStream,
496    writing: Option<WriteBuf<B>>,
497}
498
499impl<B> SendStream<B>
500where
501    B: Buf,
502{
503    /// Creates a new [`SendStream`] from an [`iroh::endpoint::SendStream`].
504    fn new(stream: iroh::endpoint::SendStream) -> SendStream<B> {
505        Self {
506            stream,
507            writing: None,
508        }
509    }
510}
511
512impl<B> quic::SendStream<B> for SendStream<B>
513where
514    B: Buf,
515{
516    /// Polls to check if the stream is ready to send more data.
517    ///
518    /// If data is pending in `self.writing`, it is written until complete.
519    fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), StreamErrorIncoming>> {
520        if let Some(ref mut data) = self.writing {
521            while data.has_remaining() {
522                let stream = Pin::new(&mut self.stream);
523                let written = ready!(stream.poll_write(cx, data.chunk()))
524                    .map_err(convert_write_error_to_stream_error)?;
525                data.advance(written);
526            }
527        }
528        self.writing = None;
529        Poll::Ready(Ok(()))
530    }
531
532    /// Finishes sending data on this stream and closes it gracefully.
533    fn poll_finish(
534        &mut self,
535        _cx: &mut task::Context<'_>,
536    ) -> Poll<Result<(), StreamErrorIncoming>> {
537        Poll::Ready(
538            self.stream
539                .finish()
540                .map_err(|e| StreamErrorIncoming::Unknown(Box::new(e))),
541        )
542    }
543
544    /// Resets the stream with the provided error code, immediately terminating it.
545    fn reset(&mut self, reset_code: u64) {
546        let _ = self
547            .stream
548            .reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
549    }
550
551    /// Queues data to be sent in the next `poll_ready` call.
552    ///
553    /// Returns an error if called while another write is still in progress.
554    fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), StreamErrorIncoming> {
555        if self.writing.is_some() {
556            return Err(StreamErrorIncoming::ConnectionErrorIncoming {
557                connection_error: ConnectionErrorIncoming::InternalError(
558                    "internal error in the http stack".to_string(),
559                ),
560            });
561        }
562        self.writing = Some(data.into());
563        Ok(())
564    }
565
566    /// Returns the QUIC stream ID for this sending stream.
567    fn send_id(&self) -> StreamId {
568        let num: u64 = self.stream.id().into();
569        num.try_into().expect("invalid stream id")
570    }
571}
572
573impl<B> quic::SendStreamUnframed<B> for SendStream<B>
574where
575    B: Buf,
576{
577    /// Polls to send unframed raw data directly from the provided buffer.
578    ///
579    /// Returns the number of bytes written on success.
580    fn poll_send<D: Buf>(
581        &mut self,
582        cx: &mut task::Context<'_>,
583        buf: &mut D,
584    ) -> Poll<Result<usize, StreamErrorIncoming>> {
585        if self.writing.is_some() {
586            panic!("poll_send called while send stream is not ready");
587        }
588
589        let s = Pin::new(&mut self.stream);
590
591        let res = ready!(s.poll_write(cx, buf.chunk()));
592        match res {
593            Ok(written) => {
594                buf.advance(written);
595                Poll::Ready(Ok(written))
596            }
597            Err(err) => Poll::Ready(Err(convert_write_error_to_stream_error(err))),
598        }
599    }
600}