1pub use quinn::{self, crypto, rustls};
10
11use datum::{Flow, Keep, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult};
12use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
13use std::sync::{Arc, Mutex, mpsc as std_mpsc};
14use tokio::net::ToSocketAddrs;
15use tokio::runtime::Handle;
16use tokio::sync::{mpsc, watch};
17use tokio::task::JoinHandle;
18
19pub const DEFAULT_CHUNK_SIZE: usize = 8192;
21
22pub type QuicByteSource = Source<Vec<u8>, NotUsed>;
27
28pub type QuicByteSink = Sink<Vec<u8>, StreamCompletion<NotUsed>>;
33
34enum DemandResponse<T> {
35 Item(T),
36 Complete,
37 Error(StreamError),
38}
39
40struct ReadResource {
41 receiver: mpsc::Receiver<DemandResponse<Vec<u8>>>,
42 cancel: watch::Sender<bool>,
43 task: JoinHandle<()>,
44}
45
46impl Drop for ReadResource {
47 fn drop(&mut self) {
48 let _ = self.cancel.send(true);
49 self.task.abort();
50 }
51}
52
53struct BindResource {
54 demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
55 cancel: watch::Sender<bool>,
56 task: JoinHandle<()>,
57}
58
59impl Drop for BindResource {
60 fn drop(&mut self) {
61 let _ = self.cancel.send(true);
62 self.task.abort();
63 }
64}
65
66struct AcceptBiResource {
67 demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
68 cancel: watch::Sender<bool>,
69 task: JoinHandle<()>,
70}
71
72impl Drop for AcceptBiResource {
73 fn drop(&mut self) {
74 let _ = self.cancel.send(true);
75 self.task.abort();
76 }
77}
78
79fn quic_error(error: impl std::fmt::Display) -> StreamError {
80 StreamError::Failed(error.to_string())
81}
82
83fn io_error(error: std::io::Error) -> StreamError {
84 StreamError::Failed(error.to_string())
85}
86
87fn abrupt_termination() -> StreamError {
88 StreamError::AbruptTermination
89}
90
91fn close_code() -> quinn::VarInt {
92 quinn::VarInt::from_u32(0)
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct QuicBinding {
98 pub local_addr: SocketAddr,
99}
100
101impl QuicBinding {
102 #[must_use]
104 pub fn local_addr(&self) -> SocketAddr {
105 self.local_addr
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub struct QuicStream {
112 pub id: quinn::StreamId,
113}
114
115impl QuicStream {
116 #[must_use]
118 pub fn id(&self) -> quinn::StreamId {
119 self.id
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct QuicConnection {
126 endpoint: quinn::Endpoint,
127 connection: quinn::Connection,
128 handle: Handle,
129 local_addr: SocketAddr,
130 remote_addr: SocketAddr,
131 chunk_size: usize,
132}
133
134impl QuicConnection {
135 #[must_use]
137 pub fn local_addr(&self) -> SocketAddr {
138 self.local_addr
139 }
140
141 #[must_use]
143 pub fn remote_addr(&self) -> SocketAddr {
144 self.remote_addr
145 }
146
147 #[must_use]
149 pub fn chunk_size(&self) -> usize {
150 self.chunk_size
151 }
152
153 #[must_use]
155 pub fn quinn_connection(&self) -> &quinn::Connection {
156 &self.connection
157 }
158
159 #[must_use]
161 pub fn quinn_endpoint(&self) -> &quinn::Endpoint {
162 &self.endpoint
163 }
164
165 #[must_use]
172 pub fn open_bi(
173 &self,
174 chunk_size: usize,
175 ) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
176 assert!(chunk_size > 0, "chunk size must be greater than zero");
177 let connection = self.connection.clone();
178 let handle = self.handle.clone();
179 Flow::future_flow(move || {
180 let connection = connection.clone();
181 let handle = handle.clone();
182 async move {
183 let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
184 Ok(quic_bi_stream_from_halves(send, recv, handle, chunk_size, false).into_flow())
185 }
186 })
187 }
188
189 #[must_use]
191 pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
192 self.open_bi(self.chunk_size)
193 }
194
195 #[must_use]
201 pub fn open_bi_stream(
202 &self,
203 chunk_size: usize,
204 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
205 assert!(chunk_size > 0, "chunk size must be greater than zero");
206 let connection = self.connection.clone();
207 let handle = self.handle.clone();
208 Source::lazy_future_source(move || {
209 let connection = connection.clone();
210 let handle = handle.clone();
211 async move {
212 let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
213 let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, false);
214 let metadata = stream.stream();
215 let stream = Arc::new(Mutex::new(Some(stream)));
216 Ok(Source::unfold_resource(
217 {
218 let stream = Arc::clone(&stream);
219 move || {
220 stream
221 .lock()
222 .expect("single-use QUIC bidi stream poisoned")
223 .take()
224 .map(Some)
225 .ok_or_else(|| {
226 StreamError::Failed(
227 "QUIC bidi stream already materialized".into(),
228 )
229 })
230 }
231 },
232 |stream| Ok(stream.take()),
233 |_stream| Ok(()),
234 )
235 .map_materialized_value(move |_| metadata))
236 }
237 })
238 }
239
240 #[must_use]
242 pub fn open_bi_stream_default(
243 &self,
244 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
245 self.open_bi_stream(self.chunk_size)
246 }
247
248 #[must_use]
255 pub fn open_bi_stream_available(
256 &self,
257 chunk_size: usize,
258 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
259 assert!(chunk_size > 0, "chunk size must be greater than zero");
260 let connection = self.connection.clone();
261 let handle = self.handle.clone();
262 Source::lazy_future_source(move || {
263 let connection = connection.clone();
264 let handle = handle.clone();
265 async move {
266 let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
267 let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, true);
268 let metadata = stream.stream();
269 let stream = Arc::new(Mutex::new(Some(stream)));
270 Ok(Source::unfold_resource(
271 {
272 let stream = Arc::clone(&stream);
273 move || {
274 stream
275 .lock()
276 .expect("single-use QUIC bidi stream poisoned")
277 .take()
278 .map(Some)
279 .ok_or_else(|| {
280 StreamError::Failed(
281 "QUIC bidi stream already materialized".into(),
282 )
283 })
284 }
285 },
286 |stream| Ok(stream.take()),
287 |_stream| Ok(()),
288 )
289 .map_materialized_value(move |_| metadata))
290 }
291 })
292 }
293
294 #[must_use]
300 pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
301 assert!(chunk_size > 0, "chunk size must be greater than zero");
302 let connection = self.clone();
303 Source::unfold_resource(
304 {
305 let connection = connection.clone();
306 move || {
307 let handle = connection.handle.clone();
308 let (demand_sender, demand_receiver) = mpsc::channel(1);
309 let (cancel_sender, cancel_receiver) = watch::channel(false);
310 let task = handle.spawn(run_accept_bi_task(
311 connection.connection.clone(),
312 chunk_size,
313 false,
314 handle.clone(),
315 demand_receiver,
316 cancel_receiver,
317 ));
318 Ok(AcceptBiResource {
319 demands: demand_sender,
320 cancel: cancel_sender,
321 task,
322 })
323 }
324 },
325 receive_demand_response,
326 close_accept_bi_resource,
327 )
328 .map_materialized_value(move |_| connection.clone())
329 }
330
331 #[must_use]
334 pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
335 self.accept_bi(self.chunk_size)
336 }
337
338 #[must_use]
343 pub fn accept_bi_available(
344 &self,
345 chunk_size: usize,
346 ) -> Source<QuicBidirectionalStream, QuicConnection> {
347 assert!(chunk_size > 0, "chunk size must be greater than zero");
348 let connection = self.clone();
349 Source::unfold_resource(
350 {
351 let connection = connection.clone();
352 move || {
353 let handle = connection.handle.clone();
354 let (demand_sender, demand_receiver) = mpsc::channel(1);
355 let (cancel_sender, cancel_receiver) = watch::channel(false);
356 let task = handle.spawn(run_accept_bi_task(
357 connection.connection.clone(),
358 chunk_size,
359 true,
360 handle.clone(),
361 demand_receiver,
362 cancel_receiver,
363 ));
364 Ok(AcceptBiResource {
365 demands: demand_sender,
366 cancel: cancel_sender,
367 task,
368 })
369 }
370 },
371 receive_demand_response,
372 close_accept_bi_resource,
373 )
374 .map_materialized_value(move |_| connection.clone())
375 }
376
377 pub fn close(&self, reason: &[u8]) {
379 self.connection.close(close_code(), reason);
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct QuicIncomingConnection {
386 connection: QuicConnection,
387}
388
389impl QuicIncomingConnection {
390 #[must_use]
392 pub fn local_addr(&self) -> SocketAddr {
393 self.connection.local_addr()
394 }
395
396 #[must_use]
398 pub fn remote_addr(&self) -> SocketAddr {
399 self.connection.remote_addr()
400 }
401
402 #[must_use]
404 pub fn connection(&self) -> QuicConnection {
405 self.connection.clone()
406 }
407
408 #[must_use]
410 pub fn into_connection(self) -> QuicConnection {
411 self.connection
412 }
413
414 #[must_use]
416 pub fn open_bi(
417 &self,
418 chunk_size: usize,
419 ) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
420 self.connection.open_bi(chunk_size)
421 }
422
423 #[must_use]
425 pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
426 self.connection.open_bi_default()
427 }
428
429 #[must_use]
431 pub fn open_bi_stream(
432 &self,
433 chunk_size: usize,
434 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
435 self.connection.open_bi_stream(chunk_size)
436 }
437
438 #[must_use]
440 pub fn open_bi_stream_default(
441 &self,
442 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
443 self.connection.open_bi_stream_default()
444 }
445
446 #[must_use]
448 pub fn open_bi_stream_available(
449 &self,
450 chunk_size: usize,
451 ) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
452 self.connection.open_bi_stream_available(chunk_size)
453 }
454
455 #[must_use]
457 pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
458 self.connection.accept_bi(chunk_size)
459 }
460
461 #[must_use]
463 pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
464 self.connection.accept_bi_default()
465 }
466
467 #[must_use]
469 pub fn accept_bi_available(
470 &self,
471 chunk_size: usize,
472 ) -> Source<QuicBidirectionalStream, QuicConnection> {
473 self.connection.accept_bi_available(chunk_size)
474 }
475}
476
477pub struct QuicBidirectionalStream {
479 stream: QuicStream,
480 source: QuicByteSource,
481 sink: QuicByteSink,
482}
483
484impl QuicBidirectionalStream {
485 #[must_use]
487 pub fn stream(&self) -> QuicStream {
488 self.stream
489 }
490
491 #[must_use]
493 pub fn into_parts(self) -> (QuicByteSource, QuicByteSink) {
494 (self.source, self.sink)
495 }
496
497 #[must_use]
499 pub fn into_flow(self) -> Flow<Vec<u8>, Vec<u8>, QuicStream> {
500 Flow::from_sink_and_source(self.sink, self.source)
501 .map_materialized_value(move |_| self.stream)
502 }
503}
504
505pub struct TokioQuic;
507
508pub type Quic = TokioQuic;
510
511impl TokioQuic {
512 #[must_use]
518 pub fn bind<A>(
519 addr: A,
520 server_config: quinn::ServerConfig,
521 chunk_size: usize,
522 ) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
523 where
524 A: ToSocketAddrs + Clone + Send + Sync + 'static,
525 {
526 assert!(chunk_size > 0, "chunk size must be greater than zero");
527 Source::lazy_future_source(move || {
528 let addr = addr.clone();
529 let server_config = server_config.clone();
530 async move {
531 let handle = Handle::current();
532 let addr = resolve_addr(addr).await?;
533 let endpoint = quinn::Endpoint::server(server_config, addr).map_err(io_error)?;
534 let local_addr = endpoint.local_addr().map_err(io_error)?;
535 Ok(quic_bind_source(endpoint, local_addr, handle, chunk_size))
536 }
537 })
538 }
539
540 #[must_use]
542 pub fn bind_default<A>(
543 addr: A,
544 server_config: quinn::ServerConfig,
545 ) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
546 where
547 A: ToSocketAddrs + Clone + Send + Sync + 'static,
548 {
549 Self::bind(addr, server_config, DEFAULT_CHUNK_SIZE)
550 }
551
552 #[must_use]
558 pub fn connect<A>(
559 addr: A,
560 server_name: impl Into<String>,
561 client_config: quinn::ClientConfig,
562 chunk_size: usize,
563 ) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
564 where
565 A: ToSocketAddrs + Clone + Send + Sync + 'static,
566 {
567 assert!(chunk_size > 0, "chunk size must be greater than zero");
568 let server_name = server_name.into();
569 Source::lazy_future_source(move || {
570 let addr = addr.clone();
571 let server_name = server_name.clone();
572 let client_config = client_config.clone();
573 async move {
574 let remote_addr = resolve_addr(addr).await?;
575 let local_addr = client_bind_addr(remote_addr);
576 let mut endpoint = quinn::Endpoint::client(local_addr).map_err(io_error)?;
577 endpoint.set_default_client_config(client_config);
578 let connecting = endpoint
579 .connect(remote_addr, &server_name)
580 .map_err(quic_error)?;
581 let connection = connecting.await.map_err(quic_error)?;
582 let endpoint_local_addr = endpoint.local_addr().map_err(io_error)?;
583 let connection = QuicConnection {
584 local_addr: connection_local_addr(
585 &connection,
586 endpoint_local_addr,
587 remote_addr.ip(),
588 ),
589 remote_addr: connection.remote_address(),
590 endpoint,
591 connection,
592 handle: Handle::current(),
593 chunk_size,
594 };
595 let materialized = connection.clone();
596 Ok(
597 Source::single(connection)
598 .map_materialized_value(move |_| materialized.clone()),
599 )
600 }
601 })
602 }
603
604 #[must_use]
606 pub fn connect_default<A>(
607 addr: A,
608 server_name: impl Into<String>,
609 client_config: quinn::ClientConfig,
610 ) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
611 where
612 A: ToSocketAddrs + Clone + Send + Sync + 'static,
613 {
614 Self::connect(addr, server_name, client_config, DEFAULT_CHUNK_SIZE)
615 }
616}
617
618async fn resolve_addr<A>(addr: A) -> StreamResult<SocketAddr>
619where
620 A: ToSocketAddrs,
621{
622 let mut addrs = tokio::net::lookup_host(addr).await.map_err(io_error)?;
623 addrs
624 .next()
625 .ok_or_else(|| StreamError::Failed("address resolved to no socket addresses".into()))
626}
627
628fn client_bind_addr(remote_addr: SocketAddr) -> SocketAddr {
629 if remote_addr.is_ipv6() {
630 SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
631 } else {
632 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
633 }
634}
635
636fn connection_local_addr(
637 connection: &quinn::Connection,
638 endpoint_addr: SocketAddr,
639 fallback_ip: IpAddr,
640) -> SocketAddr {
641 connection
642 .local_ip()
643 .map(|ip| SocketAddr::new(ip, endpoint_addr.port()))
644 .or_else(|| {
645 endpoint_addr
646 .ip()
647 .is_unspecified()
648 .then(|| SocketAddr::new(fallback_ip, endpoint_addr.port()))
649 })
650 .unwrap_or(endpoint_addr)
651}
652
653fn quic_bi_stream_from_halves(
654 send: quinn::SendStream,
655 recv: quinn::RecvStream,
656 handle: Handle,
657 chunk_size: usize,
658 emit_available: bool,
659) -> QuicBidirectionalStream {
660 let stream = QuicStream { id: send.id() };
661 QuicBidirectionalStream {
662 stream,
663 source: single_use_quic_read_source(recv, handle.clone(), chunk_size, emit_available),
664 sink: single_use_quic_write_sink(send, handle),
665 }
666}
667
668fn single_use_quic_read_source(
669 recv: quinn::RecvStream,
670 handle: Handle,
671 chunk_size: usize,
672 emit_available: bool,
673) -> QuicByteSource {
674 let recv = Arc::new(Mutex::new(Some(recv)));
675 Source::unfold_resource(
676 {
677 let recv = Arc::clone(&recv);
678 move || {
679 let recv = recv
680 .lock()
681 .expect("single-use QUIC recv stream poisoned")
682 .take()
683 .ok_or_else(|| {
684 StreamError::Failed("QUIC recv stream already materialized".into())
685 })?;
686 let (sender, receiver) = mpsc::channel(1);
687 let (cancel_sender, cancel_receiver) = watch::channel(false);
688 let task = handle.spawn(run_read_task(
689 recv,
690 chunk_size,
691 sender,
692 emit_available,
693 cancel_receiver,
694 ));
695 Ok(ReadResource {
696 receiver,
697 cancel: cancel_sender,
698 task,
699 })
700 }
701 },
702 |resource| match resource.receiver.blocking_recv() {
703 Some(DemandResponse::Item(chunk)) => Ok(Some(chunk)),
704 Some(DemandResponse::Complete) => Ok(None),
705 Some(DemandResponse::Error(error)) => Err(error),
706 None => Err(abrupt_termination()),
707 },
708 close_read_resource,
709 )
710}
711
712fn close_read_resource(resource: ReadResource) -> StreamResult<()> {
713 let _ = resource.cancel.send(true);
714 resource.task.abort();
715 Ok(())
716}
717
718async fn run_read_task(
719 mut recv: quinn::RecvStream,
720 chunk_size: usize,
721 sender: mpsc::Sender<DemandResponse<Vec<u8>>>,
722 emit_available: bool,
723 mut cancel: watch::Receiver<bool>,
724) {
725 let mut buffer = vec![0_u8; chunk_size];
726 let mut pending_tail = Vec::with_capacity(chunk_size);
727
728 loop {
729 let read = tokio::select! {
730 read = recv.read(&mut buffer) => read,
731 changed = cancel.changed() => {
732 let _ = changed;
733 return;
734 }
735 };
736
737 match read {
738 Ok(Some(read)) => {
739 if !send_read_chunks(
740 &sender,
741 chunk_size,
742 &mut pending_tail,
743 &buffer[..read],
744 emit_available,
745 &mut cancel,
746 )
747 .await
748 {
749 return;
750 }
751 }
752 Ok(None) => {
753 if !pending_tail.is_empty()
754 && !send_read_item(
755 &sender,
756 DemandResponse::Item(std::mem::take(&mut pending_tail)),
757 &mut cancel,
758 )
759 .await
760 {
761 return;
762 }
763 let _ = send_read_item(&sender, DemandResponse::Complete, &mut cancel).await;
764 return;
765 }
766 Err(error) => {
767 let _ = send_read_item(
768 &sender,
769 DemandResponse::Error(quic_error(error)),
770 &mut cancel,
771 )
772 .await;
773 return;
774 }
775 }
776 }
777}
778
779async fn send_read_chunks(
780 sender: &mpsc::Sender<DemandResponse<Vec<u8>>>,
781 chunk_size: usize,
782 pending_tail: &mut Vec<u8>,
783 read_buffer: &[u8],
784 emit_available: bool,
785 cancel: &mut watch::Receiver<bool>,
786) -> bool {
787 let mut offset = 0;
788 if !pending_tail.is_empty() {
789 let needed = chunk_size - pending_tail.len();
790 let take = needed.min(read_buffer.len());
791 pending_tail.extend_from_slice(&read_buffer[..take]);
792 offset += take;
793 if pending_tail.len() == chunk_size
794 && !send_read_item(
795 sender,
796 DemandResponse::Item(std::mem::take(pending_tail)),
797 cancel,
798 )
799 .await
800 {
801 return false;
802 }
803 }
804
805 while offset + chunk_size <= read_buffer.len() {
806 let next = offset + chunk_size;
807 if !send_read_item(
808 sender,
809 DemandResponse::Item(read_buffer[offset..next].to_vec()),
810 cancel,
811 )
812 .await
813 {
814 return false;
815 }
816 offset = next;
817 }
818
819 if offset < read_buffer.len() {
820 pending_tail.extend_from_slice(&read_buffer[offset..]);
821 }
822 if emit_available
823 && !pending_tail.is_empty()
824 && !send_read_item(
825 sender,
826 DemandResponse::Item(std::mem::take(pending_tail)),
827 cancel,
828 )
829 .await
830 {
831 return false;
832 }
833 true
834}
835
836async fn send_read_item<T>(
837 sender: &mpsc::Sender<DemandResponse<T>>,
838 item: DemandResponse<T>,
839 cancel: &mut watch::Receiver<bool>,
840) -> bool
841where
842 T: Send + 'static,
843{
844 tokio::select! {
845 result = sender.send(item) => result.is_ok(),
846 changed = cancel.changed() => {
847 let _ = changed;
848 false
849 }
850 }
851}
852
853fn single_use_quic_write_sink(send: quinn::SendStream, handle: Handle) -> QuicByteSink {
854 let send = Arc::new(Mutex::new(Some(send)));
855 Flow::<Vec<u8>, Vec<u8>>::identity()
856 .map_with_resource(
857 {
858 let send = Arc::clone(&send);
859 move || {
860 send.lock()
861 .expect("single-use QUIC send stream poisoned")
862 .take()
863 .ok_or_else(|| {
864 StreamError::Failed("QUIC send stream already materialized".into())
865 })
866 }
867 },
868 {
869 let handle = handle.clone();
870 move |send, chunk| {
871 handle.block_on(async { send.write_all(&chunk).await.map_err(quic_error) })?;
872 Ok(())
873 }
874 },
875 move |mut send| {
876 handle.block_on(async { send.write_all(&[]).await.map_err(quic_error) })?;
877 send.finish().map_err(quic_error)?;
878 Ok(None)
879 },
880 )
881 .to_mat(Sink::ignore(), Keep::right)
882}
883
884fn quic_bind_source(
885 endpoint: quinn::Endpoint,
886 local_addr: SocketAddr,
887 handle: Handle,
888 chunk_size: usize,
889) -> Source<QuicIncomingConnection, QuicBinding> {
890 let endpoint = Arc::new(Mutex::new(Some(endpoint)));
891 Source::unfold_resource(
892 {
893 let endpoint = Arc::clone(&endpoint);
894 let handle = handle.clone();
895 move || {
896 let endpoint = endpoint
897 .lock()
898 .expect("single-use QUIC endpoint poisoned")
899 .take()
900 .ok_or_else(|| {
901 StreamError::Failed("QUIC endpoint already materialized".into())
902 })?;
903 let (demand_sender, demand_receiver) = mpsc::channel(1);
904 let (cancel_sender, cancel_receiver) = watch::channel(false);
905 let task = handle.spawn(run_quic_bind_task(
906 endpoint,
907 local_addr,
908 chunk_size,
909 handle.clone(),
910 demand_receiver,
911 cancel_receiver,
912 ));
913 Ok(BindResource {
914 demands: demand_sender,
915 cancel: cancel_sender,
916 task,
917 })
918 }
919 },
920 receive_demand_response,
921 close_bind_resource,
922 )
923 .map_materialized_value(move |_| QuicBinding { local_addr })
924}
925
926fn receive_demand_response<T>(resource: &mut impl DemandResource<T>) -> StreamResult<Option<T>>
927where
928 T: Send + 'static,
929{
930 let (reply_sender, reply_receiver) = std_mpsc::channel();
931 resource
932 .demands()
933 .blocking_send(reply_sender)
934 .map_err(|_| abrupt_termination())?;
935 match reply_receiver.recv() {
936 Ok(DemandResponse::Item(item)) => Ok(Some(item)),
937 Ok(DemandResponse::Complete) => Ok(None),
938 Ok(DemandResponse::Error(error)) => Err(error),
939 Err(_) => Err(abrupt_termination()),
940 }
941}
942
943trait DemandResource<T>
944where
945 T: Send + 'static,
946{
947 fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<T>>>;
948}
949
950impl DemandResource<QuicIncomingConnection> for BindResource {
951 fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>> {
952 &self.demands
953 }
954}
955
956impl DemandResource<QuicBidirectionalStream> for AcceptBiResource {
957 fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>> {
958 &self.demands
959 }
960}
961
962fn close_bind_resource(resource: BindResource) -> StreamResult<()> {
963 let _ = resource.cancel.send(true);
964 resource.task.abort();
965 Ok(())
966}
967
968fn close_accept_bi_resource(resource: AcceptBiResource) -> StreamResult<()> {
969 let _ = resource.cancel.send(true);
970 resource.task.abort();
971 Ok(())
972}
973
974async fn run_quic_bind_task(
975 endpoint: quinn::Endpoint,
976 local_addr: SocketAddr,
977 chunk_size: usize,
978 handle: Handle,
979 mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
980 mut cancel: watch::Receiver<bool>,
981) {
982 loop {
983 let reply = tokio::select! {
984 demand = demands.recv() => match demand {
985 Some(reply) => reply,
986 None => return,
987 },
988 changed = cancel.changed() => {
989 let _ = changed;
990 return;
991 }
992 };
993
994 let incoming = tokio::select! {
995 incoming = endpoint.accept() => incoming,
996 changed = cancel.changed() => {
997 let _ = changed;
998 return;
999 }
1000 };
1001
1002 let Some(incoming) = incoming else {
1003 let _ = reply.send(DemandResponse::Complete);
1004 return;
1005 };
1006
1007 let connected = tokio::select! {
1008 connected = incoming => connected,
1009 changed = cancel.changed() => {
1010 let _ = changed;
1011 return;
1012 }
1013 };
1014
1015 match connected {
1016 Ok(connection) => {
1017 let incoming = QuicIncomingConnection {
1018 connection: QuicConnection {
1019 endpoint: endpoint.clone(),
1020 local_addr: connection_local_addr(&connection, local_addr, local_addr.ip()),
1021 remote_addr: connection.remote_address(),
1022 connection,
1023 handle: handle.clone(),
1024 chunk_size,
1025 },
1026 };
1027 if reply.send(DemandResponse::Item(incoming)).is_err() {
1028 return;
1029 }
1030 }
1031 Err(error) => {
1032 let _ = reply.send(DemandResponse::Error(quic_error(error)));
1033 return;
1034 }
1035 }
1036 }
1037}
1038
1039async fn run_accept_bi_task(
1040 connection: quinn::Connection,
1041 chunk_size: usize,
1042 emit_available: bool,
1043 handle: Handle,
1044 mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
1045 mut cancel: watch::Receiver<bool>,
1046) {
1047 loop {
1048 let reply = tokio::select! {
1049 demand = demands.recv() => match demand {
1050 Some(reply) => reply,
1051 None => return,
1052 },
1053 changed = cancel.changed() => {
1054 let _ = changed;
1055 return;
1056 }
1057 };
1058
1059 let accepted = tokio::select! {
1060 accepted = connection.accept_bi() => accepted,
1061 changed = cancel.changed() => {
1062 let _ = changed;
1063 return;
1064 }
1065 };
1066
1067 match accepted {
1068 Ok((send, recv)) => {
1069 let stream = quic_bi_stream_from_halves(
1070 send,
1071 recv,
1072 handle.clone(),
1073 chunk_size,
1074 emit_available,
1075 );
1076 if reply.send(DemandResponse::Item(stream)).is_err() {
1077 return;
1078 }
1079 }
1080 Err(error) => {
1081 let _ = reply.send(DemandResponse::Error(quic_error(error)));
1082 return;
1083 }
1084 }
1085 }
1086}