Skip to main content

datum_net/
quic.rs

1//! QUIC endpoints, connections, and bidirectional byte streams.
2//!
3//! [`TokioQuic`] builds on `quinn` and exposes QUIC's reliable, ordered,
4//! flow-controlled bidirectional streams as Datum byte flows. Callers provide
5//! Quinn client/server configs, typically built from rustls configs through
6//! [`crypto::rustls::QuicClientConfig`] and
7//! [`crypto::rustls::QuicServerConfig`].
8
9pub use quinn::{self, crypto, rustls};
10
11use datum::{Flow, Keep, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult};
12use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
13use std::sync::{Arc, Mutex, mpsc as std_mpsc};
14use tokio::net::ToSocketAddrs;
15use tokio::runtime::Handle;
16use tokio::sync::{mpsc, watch};
17use tokio::task::JoinHandle;
18
19/// Default maximum bytes emitted per QUIC byte-source chunk.
20pub const DEFAULT_CHUNK_SIZE: usize = 8192;
21
22/// QUIC byte source used by accepted and opened bidirectional streams.
23///
24/// The source emits `Vec<u8>` chunks and backpressures the Quinn receive stream
25/// with a capacity-1 channel, preserving QUIC's reliable flow-control semantics.
26pub type QuicByteSource = Source<Vec<u8>, NotUsed>;
27
28/// QUIC byte sink used by accepted and opened bidirectional streams.
29///
30/// The sink writes one upstream chunk at a time and sends a QUIC stream FIN from
31/// its resource close hook when upstream completes.
32pub type QuicByteSink = Sink<Vec<u8>, StreamCompletion<NotUsed>>;
33
34enum DemandResponse<T> {
35    Item(T),
36    Complete,
37    Error(StreamError),
38}
39
40struct ReadResource {
41    receiver: mpsc::Receiver<DemandResponse<Vec<u8>>>,
42    cancel: watch::Sender<bool>,
43    task: JoinHandle<()>,
44}
45
46impl Drop for ReadResource {
47    fn drop(&mut self) {
48        let _ = self.cancel.send(true);
49        self.task.abort();
50    }
51}
52
53struct BindResource {
54    demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
55    cancel: watch::Sender<bool>,
56    task: JoinHandle<()>,
57}
58
59impl Drop for BindResource {
60    fn drop(&mut self) {
61        let _ = self.cancel.send(true);
62        self.task.abort();
63    }
64}
65
66struct AcceptBiResource {
67    demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
68    cancel: watch::Sender<bool>,
69    task: JoinHandle<()>,
70}
71
72impl Drop for AcceptBiResource {
73    fn drop(&mut self) {
74        let _ = self.cancel.send(true);
75        self.task.abort();
76    }
77}
78
79fn quic_error(error: impl std::fmt::Display) -> StreamError {
80    StreamError::Failed(error.to_string())
81}
82
83fn io_error(error: std::io::Error) -> StreamError {
84    StreamError::Failed(error.to_string())
85}
86
87fn abrupt_termination() -> StreamError {
88    StreamError::AbruptTermination
89}
90
91fn close_code() -> quinn::VarInt {
92    quinn::VarInt::from_u32(0)
93}
94
95/// A materialized QUIC listener binding.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct QuicBinding {
98    pub local_addr: SocketAddr,
99}
100
101impl QuicBinding {
102    /// Returns the local UDP address the QUIC endpoint is bound to.
103    #[must_use]
104    pub fn local_addr(&self) -> SocketAddr {
105        self.local_addr
106    }
107}
108
109/// Metadata for a materialized QUIC stream.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub struct QuicStream {
112    pub id: quinn::StreamId,
113}
114
115impl QuicStream {
116    /// Returns Quinn's stream identifier.
117    #[must_use]
118    pub fn id(&self) -> quinn::StreamId {
119        self.id
120    }
121}
122
123/// A materialized QUIC connection.
124#[derive(Debug, Clone)]
125pub struct QuicConnection {
126    endpoint: quinn::Endpoint,
127    connection: quinn::Connection,
128    handle: Handle,
129    local_addr: SocketAddr,
130    remote_addr: SocketAddr,
131    chunk_size: usize,
132}
133
134impl QuicConnection {
135    /// Returns the local UDP endpoint address for this connection.
136    #[must_use]
137    pub fn local_addr(&self) -> SocketAddr {
138        self.local_addr
139    }
140
141    /// Returns the peer UDP endpoint address for this connection.
142    #[must_use]
143    pub fn remote_addr(&self) -> SocketAddr {
144        self.remote_addr
145    }
146
147    /// Returns the default chunk size used by stream helpers on this connection.
148    #[must_use]
149    pub fn chunk_size(&self) -> usize {
150        self.chunk_size
151    }
152
153    /// Returns the underlying Quinn connection handle.
154    #[must_use]
155    pub fn quinn_connection(&self) -> &quinn::Connection {
156        &self.connection
157    }
158
159    /// Returns the underlying Quinn endpoint handle that owns the UDP socket.
160    #[must_use]
161    pub fn quinn_endpoint(&self) -> &quinn::Endpoint {
162        &self.endpoint
163    }
164
165    /// Opens a bidirectional QUIC stream as a Datum byte flow.
166    ///
167    /// Opening and the stream-id allocation happen when this flow is
168    /// materialized. Quinn only exposes the stream to the peer after the
169    /// initiating side writes data, so a peer-side `accept_bi` will not complete
170    /// until the first write or FIN is sent.
171    #[must_use]
172    pub fn open_bi(
173        &self,
174        chunk_size: usize,
175    ) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
176        assert!(chunk_size > 0, "chunk size must be greater than zero");
177        let connection = self.connection.clone();
178        let handle = self.handle.clone();
179        Flow::future_flow(move || {
180            let connection = connection.clone();
181            let handle = handle.clone();
182            async move {
183                let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
184                Ok(quic_bi_stream_from_halves(send, recv, handle, chunk_size, false).into_flow())
185            }
186        })
187    }
188
189    /// Opens a bidirectional stream using the connection's default chunk size.
190    #[must_use]
191    pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
192        self.open_bi(self.chunk_size)
193    }
194
195    /// Opens a bidirectional QUIC stream and emits the split stream object.
196    ///
197    /// This is the object-shaped counterpart to [`QuicConnection::open_bi`],
198    /// used by protocol carriers that need to drive the byte source and sink
199    /// independently.
200    #[must_use]
201    pub fn open_bi_stream(
202        &self,
203        chunk_size: usize,
204    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
205        assert!(chunk_size > 0, "chunk size must be greater than zero");
206        let connection = self.connection.clone();
207        let handle = self.handle.clone();
208        Source::lazy_future_source(move || {
209            let connection = connection.clone();
210            let handle = handle.clone();
211            async move {
212                let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
213                let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, false);
214                let metadata = stream.stream();
215                let stream = Arc::new(Mutex::new(Some(stream)));
216                Ok(Source::unfold_resource(
217                    {
218                        let stream = Arc::clone(&stream);
219                        move || {
220                            stream
221                                .lock()
222                                .expect("single-use QUIC bidi stream poisoned")
223                                .take()
224                                .map(Some)
225                                .ok_or_else(|| {
226                                    StreamError::Failed(
227                                        "QUIC bidi stream already materialized".into(),
228                                    )
229                                })
230                        }
231                    },
232                    |stream| Ok(stream.take()),
233                    |_stream| Ok(()),
234                )
235                .map_materialized_value(move |_| metadata))
236            }
237        })
238    }
239
240    /// Opens a split bidirectional stream using the connection's default chunk size.
241    #[must_use]
242    pub fn open_bi_stream_default(
243        &self,
244    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
245        self.open_bi_stream(self.chunk_size)
246    }
247
248    /// Opens a split bidirectional stream with emit-available read mode.
249    ///
250    /// Like [`open_bi_stream`](QuicConnection::open_bi_stream) but the byte source emits chunks as soon as
251    /// any bytes arrive rather than waiting to fill `chunk_size`. Use for
252    /// interactive protocols (e.g. StreamRefs) where small frames must flow
253    /// without accumulating in the read buffer.
254    #[must_use]
255    pub fn open_bi_stream_available(
256        &self,
257        chunk_size: usize,
258    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
259        assert!(chunk_size > 0, "chunk size must be greater than zero");
260        let connection = self.connection.clone();
261        let handle = self.handle.clone();
262        Source::lazy_future_source(move || {
263            let connection = connection.clone();
264            let handle = handle.clone();
265            async move {
266                let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
267                let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, true);
268                let metadata = stream.stream();
269                let stream = Arc::new(Mutex::new(Some(stream)));
270                Ok(Source::unfold_resource(
271                    {
272                        let stream = Arc::clone(&stream);
273                        move || {
274                            stream
275                                .lock()
276                                .expect("single-use QUIC bidi stream poisoned")
277                                .take()
278                                .map(Some)
279                                .ok_or_else(|| {
280                                    StreamError::Failed(
281                                        "QUIC bidi stream already materialized".into(),
282                                    )
283                                })
284                        }
285                    },
286                    |stream| Ok(stream.take()),
287                    |_stream| Ok(()),
288                )
289                .map_materialized_value(move |_| metadata))
290            }
291        })
292    }
293
294    /// Accepts incoming bidirectional QUIC streams.
295    ///
296    /// Each downstream pull accepts one stream. Accepted streams are emitted as
297    /// [`QuicBidirectionalStream`] values that can be split or converted into a
298    /// Datum byte flow.
299    #[must_use]
300    pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
301        assert!(chunk_size > 0, "chunk size must be greater than zero");
302        let connection = self.clone();
303        Source::unfold_resource(
304            {
305                let connection = connection.clone();
306                move || {
307                    let handle = connection.handle.clone();
308                    let (demand_sender, demand_receiver) = mpsc::channel(1);
309                    let (cancel_sender, cancel_receiver) = watch::channel(false);
310                    let task = handle.spawn(run_accept_bi_task(
311                        connection.connection.clone(),
312                        chunk_size,
313                        false,
314                        handle.clone(),
315                        demand_receiver,
316                        cancel_receiver,
317                    ));
318                    Ok(AcceptBiResource {
319                        demands: demand_sender,
320                        cancel: cancel_sender,
321                        task,
322                    })
323                }
324            },
325            receive_demand_response,
326            close_accept_bi_resource,
327        )
328        .map_materialized_value(move |_| connection.clone())
329    }
330
331    /// Accepts incoming bidirectional streams using the connection's default
332    /// chunk size.
333    #[must_use]
334    pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
335        self.accept_bi(self.chunk_size)
336    }
337
338    /// Accepts incoming bidirectional streams with emit-available read mode.
339    ///
340    /// Like [`accept_bi`](QuicConnection::accept_bi) but the byte source emits chunks as soon as any
341    /// bytes arrive rather than waiting to fill `chunk_size`.
342    #[must_use]
343    pub fn accept_bi_available(
344        &self,
345        chunk_size: usize,
346    ) -> Source<QuicBidirectionalStream, QuicConnection> {
347        assert!(chunk_size > 0, "chunk size must be greater than zero");
348        let connection = self.clone();
349        Source::unfold_resource(
350            {
351                let connection = connection.clone();
352                move || {
353                    let handle = connection.handle.clone();
354                    let (demand_sender, demand_receiver) = mpsc::channel(1);
355                    let (cancel_sender, cancel_receiver) = watch::channel(false);
356                    let task = handle.spawn(run_accept_bi_task(
357                        connection.connection.clone(),
358                        chunk_size,
359                        true,
360                        handle.clone(),
361                        demand_receiver,
362                        cancel_receiver,
363                    ));
364                    Ok(AcceptBiResource {
365                        demands: demand_sender,
366                        cancel: cancel_sender,
367                        task,
368                    })
369                }
370            },
371            receive_demand_response,
372            close_accept_bi_resource,
373        )
374        .map_materialized_value(move |_| connection.clone())
375    }
376
377    /// Closes the QUIC connection with an application close code of `0`.
378    pub fn close(&self, reason: &[u8]) {
379        self.connection.close(close_code(), reason);
380    }
381}
382
383/// A QUIC connection accepted by [`TokioQuic::bind`].
384#[derive(Debug, Clone)]
385pub struct QuicIncomingConnection {
386    connection: QuicConnection,
387}
388
389impl QuicIncomingConnection {
390    /// Returns the local UDP endpoint address for this connection.
391    #[must_use]
392    pub fn local_addr(&self) -> SocketAddr {
393        self.connection.local_addr()
394    }
395
396    /// Returns the peer UDP endpoint address for this connection.
397    #[must_use]
398    pub fn remote_addr(&self) -> SocketAddr {
399        self.connection.remote_addr()
400    }
401
402    /// Returns a clone of the materialized QUIC connection handle.
403    #[must_use]
404    pub fn connection(&self) -> QuicConnection {
405        self.connection.clone()
406    }
407
408    /// Consumes this value and returns the materialized QUIC connection.
409    #[must_use]
410    pub fn into_connection(self) -> QuicConnection {
411        self.connection
412    }
413
414    /// Opens a bidirectional stream from the accepted connection.
415    #[must_use]
416    pub fn open_bi(
417        &self,
418        chunk_size: usize,
419    ) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
420        self.connection.open_bi(chunk_size)
421    }
422
423    /// Opens a bidirectional stream using the connection's default chunk size.
424    #[must_use]
425    pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
426        self.connection.open_bi_default()
427    }
428
429    /// Opens a split bidirectional stream from the accepted connection.
430    #[must_use]
431    pub fn open_bi_stream(
432        &self,
433        chunk_size: usize,
434    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
435        self.connection.open_bi_stream(chunk_size)
436    }
437
438    /// Opens a split bidirectional stream using the default chunk size.
439    #[must_use]
440    pub fn open_bi_stream_default(
441        &self,
442    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
443        self.connection.open_bi_stream_default()
444    }
445
446    /// Opens a split bidirectional stream with emit-available read mode.
447    #[must_use]
448    pub fn open_bi_stream_available(
449        &self,
450        chunk_size: usize,
451    ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
452        self.connection.open_bi_stream_available(chunk_size)
453    }
454
455    /// Accepts incoming bidirectional streams on this connection.
456    #[must_use]
457    pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
458        self.connection.accept_bi(chunk_size)
459    }
460
461    /// Accepts incoming bidirectional streams using the default chunk size.
462    #[must_use]
463    pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
464        self.connection.accept_bi_default()
465    }
466
467    /// Accepts incoming bidirectional streams with emit-available read mode.
468    #[must_use]
469    pub fn accept_bi_available(
470        &self,
471        chunk_size: usize,
472    ) -> Source<QuicBidirectionalStream, QuicConnection> {
473        self.connection.accept_bi_available(chunk_size)
474    }
475}
476
477/// An accepted or opened QUIC bidirectional stream.
478pub struct QuicBidirectionalStream {
479    stream: QuicStream,
480    source: QuicByteSource,
481    sink: QuicByteSink,
482}
483
484impl QuicBidirectionalStream {
485    /// Returns stream metadata.
486    #[must_use]
487    pub fn stream(&self) -> QuicStream {
488        self.stream
489    }
490
491    /// Splits the stream into receive and send byte halves.
492    #[must_use]
493    pub fn into_parts(self) -> (QuicByteSource, QuicByteSink) {
494        (self.source, self.sink)
495    }
496
497    /// Converts this QUIC stream into a Datum byte flow.
498    #[must_use]
499    pub fn into_flow(self) -> Flow<Vec<u8>, Vec<u8>, QuicStream> {
500        Flow::from_sink_and_source(self.sink, self.source)
501            .map_materialized_value(move |_| self.stream)
502    }
503}
504
505/// QUIC endpoint entry points backed by Quinn.
506pub struct TokioQuic;
507
508/// Alias for [`TokioQuic`].
509pub type Quic = TokioQuic;
510
511impl TokioQuic {
512    /// Binds a QUIC server endpoint and emits accepted connections.
513    ///
514    /// The UDP socket and Quinn endpoint bind when the source is materialized.
515    /// Each downstream pull accepts one connection attempt and drives the QUIC
516    /// handshake. Handshake failures surface as [`StreamError`] values.
517    #[must_use]
518    pub fn bind<A>(
519        addr: A,
520        server_config: quinn::ServerConfig,
521        chunk_size: usize,
522    ) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
523    where
524        A: ToSocketAddrs + Clone + Send + Sync + 'static,
525    {
526        assert!(chunk_size > 0, "chunk size must be greater than zero");
527        Source::lazy_future_source(move || {
528            let addr = addr.clone();
529            let server_config = server_config.clone();
530            async move {
531                let handle = Handle::current();
532                let addr = resolve_addr(addr).await?;
533                let endpoint = quinn::Endpoint::server(server_config, addr).map_err(io_error)?;
534                let local_addr = endpoint.local_addr().map_err(io_error)?;
535                Ok(quic_bind_source(endpoint, local_addr, handle, chunk_size))
536            }
537        })
538    }
539
540    /// Binds a QUIC server endpoint using the default 8 KiB stream chunk size.
541    #[must_use]
542    pub fn bind_default<A>(
543        addr: A,
544        server_config: quinn::ServerConfig,
545    ) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
546    where
547        A: ToSocketAddrs + Clone + Send + Sync + 'static,
548    {
549        Self::bind(addr, server_config, DEFAULT_CHUNK_SIZE)
550    }
551
552    /// Opens a QUIC client endpoint and emits one materialized connection.
553    ///
554    /// The local endpoint binds to an OS-assigned UDP port matching the remote
555    /// address family. The Quinn client config controls rustls trust policy,
556    /// ALPN, transport settings, and certificate verification.
557    #[must_use]
558    pub fn connect<A>(
559        addr: A,
560        server_name: impl Into<String>,
561        client_config: quinn::ClientConfig,
562        chunk_size: usize,
563    ) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
564    where
565        A: ToSocketAddrs + Clone + Send + Sync + 'static,
566    {
567        assert!(chunk_size > 0, "chunk size must be greater than zero");
568        let server_name = server_name.into();
569        Source::lazy_future_source(move || {
570            let addr = addr.clone();
571            let server_name = server_name.clone();
572            let client_config = client_config.clone();
573            async move {
574                let remote_addr = resolve_addr(addr).await?;
575                let local_addr = client_bind_addr(remote_addr);
576                let mut endpoint = quinn::Endpoint::client(local_addr).map_err(io_error)?;
577                endpoint.set_default_client_config(client_config);
578                let connecting = endpoint
579                    .connect(remote_addr, &server_name)
580                    .map_err(quic_error)?;
581                let connection = connecting.await.map_err(quic_error)?;
582                let endpoint_local_addr = endpoint.local_addr().map_err(io_error)?;
583                let connection = QuicConnection {
584                    local_addr: connection_local_addr(
585                        &connection,
586                        endpoint_local_addr,
587                        remote_addr.ip(),
588                    ),
589                    remote_addr: connection.remote_address(),
590                    endpoint,
591                    connection,
592                    handle: Handle::current(),
593                    chunk_size,
594                };
595                let materialized = connection.clone();
596                Ok(
597                    Source::single(connection)
598                        .map_materialized_value(move |_| materialized.clone()),
599                )
600            }
601        })
602    }
603
604    /// Opens a QUIC client endpoint using the default 8 KiB stream chunk size.
605    #[must_use]
606    pub fn connect_default<A>(
607        addr: A,
608        server_name: impl Into<String>,
609        client_config: quinn::ClientConfig,
610    ) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
611    where
612        A: ToSocketAddrs + Clone + Send + Sync + 'static,
613    {
614        Self::connect(addr, server_name, client_config, DEFAULT_CHUNK_SIZE)
615    }
616}
617
618async fn resolve_addr<A>(addr: A) -> StreamResult<SocketAddr>
619where
620    A: ToSocketAddrs,
621{
622    let mut addrs = tokio::net::lookup_host(addr).await.map_err(io_error)?;
623    addrs
624        .next()
625        .ok_or_else(|| StreamError::Failed("address resolved to no socket addresses".into()))
626}
627
628fn client_bind_addr(remote_addr: SocketAddr) -> SocketAddr {
629    if remote_addr.is_ipv6() {
630        SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
631    } else {
632        SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
633    }
634}
635
636fn connection_local_addr(
637    connection: &quinn::Connection,
638    endpoint_addr: SocketAddr,
639    fallback_ip: IpAddr,
640) -> SocketAddr {
641    connection
642        .local_ip()
643        .map(|ip| SocketAddr::new(ip, endpoint_addr.port()))
644        .or_else(|| {
645            endpoint_addr
646                .ip()
647                .is_unspecified()
648                .then(|| SocketAddr::new(fallback_ip, endpoint_addr.port()))
649        })
650        .unwrap_or(endpoint_addr)
651}
652
653fn quic_bi_stream_from_halves(
654    send: quinn::SendStream,
655    recv: quinn::RecvStream,
656    handle: Handle,
657    chunk_size: usize,
658    emit_available: bool,
659) -> QuicBidirectionalStream {
660    let stream = QuicStream { id: send.id() };
661    QuicBidirectionalStream {
662        stream,
663        source: single_use_quic_read_source(recv, handle.clone(), chunk_size, emit_available),
664        sink: single_use_quic_write_sink(send, handle),
665    }
666}
667
668fn single_use_quic_read_source(
669    recv: quinn::RecvStream,
670    handle: Handle,
671    chunk_size: usize,
672    emit_available: bool,
673) -> QuicByteSource {
674    let recv = Arc::new(Mutex::new(Some(recv)));
675    Source::unfold_resource(
676        {
677            let recv = Arc::clone(&recv);
678            move || {
679                let recv = recv
680                    .lock()
681                    .expect("single-use QUIC recv stream poisoned")
682                    .take()
683                    .ok_or_else(|| {
684                        StreamError::Failed("QUIC recv stream already materialized".into())
685                    })?;
686                let (sender, receiver) = mpsc::channel(1);
687                let (cancel_sender, cancel_receiver) = watch::channel(false);
688                let task = handle.spawn(run_read_task(
689                    recv,
690                    chunk_size,
691                    sender,
692                    emit_available,
693                    cancel_receiver,
694                ));
695                Ok(ReadResource {
696                    receiver,
697                    cancel: cancel_sender,
698                    task,
699                })
700            }
701        },
702        |resource| match resource.receiver.blocking_recv() {
703            Some(DemandResponse::Item(chunk)) => Ok(Some(chunk)),
704            Some(DemandResponse::Complete) => Ok(None),
705            Some(DemandResponse::Error(error)) => Err(error),
706            None => Err(abrupt_termination()),
707        },
708        close_read_resource,
709    )
710}
711
712fn close_read_resource(resource: ReadResource) -> StreamResult<()> {
713    let _ = resource.cancel.send(true);
714    resource.task.abort();
715    Ok(())
716}
717
718async fn run_read_task(
719    mut recv: quinn::RecvStream,
720    chunk_size: usize,
721    sender: mpsc::Sender<DemandResponse<Vec<u8>>>,
722    emit_available: bool,
723    mut cancel: watch::Receiver<bool>,
724) {
725    let mut buffer = vec![0_u8; chunk_size];
726    let mut pending_tail = Vec::with_capacity(chunk_size);
727
728    loop {
729        let read = tokio::select! {
730            read = recv.read(&mut buffer) => read,
731            changed = cancel.changed() => {
732                let _ = changed;
733                return;
734            }
735        };
736
737        match read {
738            Ok(Some(read)) => {
739                if !send_read_chunks(
740                    &sender,
741                    chunk_size,
742                    &mut pending_tail,
743                    &buffer[..read],
744                    emit_available,
745                    &mut cancel,
746                )
747                .await
748                {
749                    return;
750                }
751            }
752            Ok(None) => {
753                if !pending_tail.is_empty()
754                    && !send_read_item(
755                        &sender,
756                        DemandResponse::Item(std::mem::take(&mut pending_tail)),
757                        &mut cancel,
758                    )
759                    .await
760                {
761                    return;
762                }
763                let _ = send_read_item(&sender, DemandResponse::Complete, &mut cancel).await;
764                return;
765            }
766            Err(error) => {
767                let _ = send_read_item(
768                    &sender,
769                    DemandResponse::Error(quic_error(error)),
770                    &mut cancel,
771                )
772                .await;
773                return;
774            }
775        }
776    }
777}
778
779async fn send_read_chunks(
780    sender: &mpsc::Sender<DemandResponse<Vec<u8>>>,
781    chunk_size: usize,
782    pending_tail: &mut Vec<u8>,
783    read_buffer: &[u8],
784    emit_available: bool,
785    cancel: &mut watch::Receiver<bool>,
786) -> bool {
787    let mut offset = 0;
788    if !pending_tail.is_empty() {
789        let needed = chunk_size - pending_tail.len();
790        let take = needed.min(read_buffer.len());
791        pending_tail.extend_from_slice(&read_buffer[..take]);
792        offset += take;
793        if pending_tail.len() == chunk_size
794            && !send_read_item(
795                sender,
796                DemandResponse::Item(std::mem::take(pending_tail)),
797                cancel,
798            )
799            .await
800        {
801            return false;
802        }
803    }
804
805    while offset + chunk_size <= read_buffer.len() {
806        let next = offset + chunk_size;
807        if !send_read_item(
808            sender,
809            DemandResponse::Item(read_buffer[offset..next].to_vec()),
810            cancel,
811        )
812        .await
813        {
814            return false;
815        }
816        offset = next;
817    }
818
819    if offset < read_buffer.len() {
820        pending_tail.extend_from_slice(&read_buffer[offset..]);
821    }
822    if emit_available
823        && !pending_tail.is_empty()
824        && !send_read_item(
825            sender,
826            DemandResponse::Item(std::mem::take(pending_tail)),
827            cancel,
828        )
829        .await
830    {
831        return false;
832    }
833    true
834}
835
836async fn send_read_item<T>(
837    sender: &mpsc::Sender<DemandResponse<T>>,
838    item: DemandResponse<T>,
839    cancel: &mut watch::Receiver<bool>,
840) -> bool
841where
842    T: Send + 'static,
843{
844    tokio::select! {
845        result = sender.send(item) => result.is_ok(),
846        changed = cancel.changed() => {
847            let _ = changed;
848            false
849        }
850    }
851}
852
853fn single_use_quic_write_sink(send: quinn::SendStream, handle: Handle) -> QuicByteSink {
854    let send = Arc::new(Mutex::new(Some(send)));
855    Flow::<Vec<u8>, Vec<u8>>::identity()
856        .map_with_resource(
857            {
858                let send = Arc::clone(&send);
859                move || {
860                    send.lock()
861                        .expect("single-use QUIC send stream poisoned")
862                        .take()
863                        .ok_or_else(|| {
864                            StreamError::Failed("QUIC send stream already materialized".into())
865                        })
866                }
867            },
868            {
869                let handle = handle.clone();
870                move |send, chunk| {
871                    handle.block_on(async { send.write_all(&chunk).await.map_err(quic_error) })?;
872                    Ok(())
873                }
874            },
875            move |mut send| {
876                handle.block_on(async { send.write_all(&[]).await.map_err(quic_error) })?;
877                send.finish().map_err(quic_error)?;
878                Ok(None)
879            },
880        )
881        .to_mat(Sink::ignore(), Keep::right)
882}
883
884fn quic_bind_source(
885    endpoint: quinn::Endpoint,
886    local_addr: SocketAddr,
887    handle: Handle,
888    chunk_size: usize,
889) -> Source<QuicIncomingConnection, QuicBinding> {
890    let endpoint = Arc::new(Mutex::new(Some(endpoint)));
891    Source::unfold_resource(
892        {
893            let endpoint = Arc::clone(&endpoint);
894            let handle = handle.clone();
895            move || {
896                let endpoint = endpoint
897                    .lock()
898                    .expect("single-use QUIC endpoint poisoned")
899                    .take()
900                    .ok_or_else(|| {
901                        StreamError::Failed("QUIC endpoint already materialized".into())
902                    })?;
903                let (demand_sender, demand_receiver) = mpsc::channel(1);
904                let (cancel_sender, cancel_receiver) = watch::channel(false);
905                let task = handle.spawn(run_quic_bind_task(
906                    endpoint,
907                    local_addr,
908                    chunk_size,
909                    handle.clone(),
910                    demand_receiver,
911                    cancel_receiver,
912                ));
913                Ok(BindResource {
914                    demands: demand_sender,
915                    cancel: cancel_sender,
916                    task,
917                })
918            }
919        },
920        receive_demand_response,
921        close_bind_resource,
922    )
923    .map_materialized_value(move |_| QuicBinding { local_addr })
924}
925
926fn receive_demand_response<T>(resource: &mut impl DemandResource<T>) -> StreamResult<Option<T>>
927where
928    T: Send + 'static,
929{
930    let (reply_sender, reply_receiver) = std_mpsc::channel();
931    resource
932        .demands()
933        .blocking_send(reply_sender)
934        .map_err(|_| abrupt_termination())?;
935    match reply_receiver.recv() {
936        Ok(DemandResponse::Item(item)) => Ok(Some(item)),
937        Ok(DemandResponse::Complete) => Ok(None),
938        Ok(DemandResponse::Error(error)) => Err(error),
939        Err(_) => Err(abrupt_termination()),
940    }
941}
942
943trait DemandResource<T>
944where
945    T: Send + 'static,
946{
947    fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<T>>>;
948}
949
950impl DemandResource<QuicIncomingConnection> for BindResource {
951    fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>> {
952        &self.demands
953    }
954}
955
956impl DemandResource<QuicBidirectionalStream> for AcceptBiResource {
957    fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>> {
958        &self.demands
959    }
960}
961
962fn close_bind_resource(resource: BindResource) -> StreamResult<()> {
963    let _ = resource.cancel.send(true);
964    resource.task.abort();
965    Ok(())
966}
967
968fn close_accept_bi_resource(resource: AcceptBiResource) -> StreamResult<()> {
969    let _ = resource.cancel.send(true);
970    resource.task.abort();
971    Ok(())
972}
973
974async fn run_quic_bind_task(
975    endpoint: quinn::Endpoint,
976    local_addr: SocketAddr,
977    chunk_size: usize,
978    handle: Handle,
979    mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
980    mut cancel: watch::Receiver<bool>,
981) {
982    loop {
983        let reply = tokio::select! {
984            demand = demands.recv() => match demand {
985                Some(reply) => reply,
986                None => return,
987            },
988            changed = cancel.changed() => {
989                let _ = changed;
990                return;
991            }
992        };
993
994        let incoming = tokio::select! {
995            incoming = endpoint.accept() => incoming,
996            changed = cancel.changed() => {
997                let _ = changed;
998                return;
999            }
1000        };
1001
1002        let Some(incoming) = incoming else {
1003            let _ = reply.send(DemandResponse::Complete);
1004            return;
1005        };
1006
1007        let connected = tokio::select! {
1008            connected = incoming => connected,
1009            changed = cancel.changed() => {
1010                let _ = changed;
1011                return;
1012            }
1013        };
1014
1015        match connected {
1016            Ok(connection) => {
1017                let incoming = QuicIncomingConnection {
1018                    connection: QuicConnection {
1019                        endpoint: endpoint.clone(),
1020                        local_addr: connection_local_addr(&connection, local_addr, local_addr.ip()),
1021                        remote_addr: connection.remote_address(),
1022                        connection,
1023                        handle: handle.clone(),
1024                        chunk_size,
1025                    },
1026                };
1027                if reply.send(DemandResponse::Item(incoming)).is_err() {
1028                    return;
1029                }
1030            }
1031            Err(error) => {
1032                let _ = reply.send(DemandResponse::Error(quic_error(error)));
1033                return;
1034            }
1035        }
1036    }
1037}
1038
1039async fn run_accept_bi_task(
1040    connection: quinn::Connection,
1041    chunk_size: usize,
1042    emit_available: bool,
1043    handle: Handle,
1044    mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
1045    mut cancel: watch::Receiver<bool>,
1046) {
1047    loop {
1048        let reply = tokio::select! {
1049            demand = demands.recv() => match demand {
1050                Some(reply) => reply,
1051                None => return,
1052            },
1053            changed = cancel.changed() => {
1054                let _ = changed;
1055                return;
1056            }
1057        };
1058
1059        let accepted = tokio::select! {
1060            accepted = connection.accept_bi() => accepted,
1061            changed = cancel.changed() => {
1062                let _ = changed;
1063                return;
1064            }
1065        };
1066
1067        match accepted {
1068            Ok((send, recv)) => {
1069                let stream = quic_bi_stream_from_halves(
1070                    send,
1071                    recv,
1072                    handle.clone(),
1073                    chunk_size,
1074                    emit_available,
1075                );
1076                if reply.send(DemandResponse::Item(stream)).is_err() {
1077                    return;
1078                }
1079            }
1080            Err(error) => {
1081                let _ = reply.send(DemandResponse::Error(quic_error(error)));
1082                return;
1083            }
1084        }
1085    }
1086}