pub use quinn::{self, crypto, rustls};
use datum::{Flow, Keep, NotUsed, Sink, Source, StreamCompletion, StreamError, StreamResult};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::{Arc, Mutex, mpsc as std_mpsc};
use tokio::net::ToSocketAddrs;
use tokio::runtime::Handle;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
pub const DEFAULT_CHUNK_SIZE: usize = 8192;
pub type QuicByteSource = Source<Vec<u8>, NotUsed>;
pub type QuicByteSink = Sink<Vec<u8>, StreamCompletion<NotUsed>>;
enum DemandResponse<T> {
Item(T),
Complete,
Error(StreamError),
}
struct ReadResource {
receiver: mpsc::Receiver<DemandResponse<Vec<u8>>>,
cancel: watch::Sender<bool>,
task: JoinHandle<()>,
}
impl Drop for ReadResource {
fn drop(&mut self) {
let _ = self.cancel.send(true);
self.task.abort();
}
}
struct BindResource {
demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
cancel: watch::Sender<bool>,
task: JoinHandle<()>,
}
impl Drop for BindResource {
fn drop(&mut self) {
let _ = self.cancel.send(true);
self.task.abort();
}
}
struct AcceptBiResource {
demands: mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
cancel: watch::Sender<bool>,
task: JoinHandle<()>,
}
impl Drop for AcceptBiResource {
fn drop(&mut self) {
let _ = self.cancel.send(true);
self.task.abort();
}
}
fn quic_error(error: impl std::fmt::Display) -> StreamError {
StreamError::Failed(error.to_string())
}
fn io_error(error: std::io::Error) -> StreamError {
StreamError::Failed(error.to_string())
}
fn abrupt_termination() -> StreamError {
StreamError::AbruptTermination
}
fn close_code() -> quinn::VarInt {
quinn::VarInt::from_u32(0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QuicBinding {
pub local_addr: SocketAddr,
}
impl QuicBinding {
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QuicStream {
pub id: quinn::StreamId,
}
impl QuicStream {
#[must_use]
pub fn id(&self) -> quinn::StreamId {
self.id
}
}
#[derive(Debug, Clone)]
pub struct QuicConnection {
endpoint: quinn::Endpoint,
connection: quinn::Connection,
handle: Handle,
local_addr: SocketAddr,
remote_addr: SocketAddr,
chunk_size: usize,
}
impl QuicConnection {
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
#[must_use]
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
#[must_use]
pub fn chunk_size(&self) -> usize {
self.chunk_size
}
#[must_use]
pub fn quinn_connection(&self) -> &quinn::Connection {
&self.connection
}
#[must_use]
pub fn quinn_endpoint(&self) -> &quinn::Endpoint {
&self.endpoint
}
#[must_use]
pub fn open_bi(
&self,
chunk_size: usize,
) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
assert!(chunk_size > 0, "chunk size must be greater than zero");
let connection = self.connection.clone();
let handle = self.handle.clone();
Flow::future_flow(move || {
let connection = connection.clone();
let handle = handle.clone();
async move {
let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
Ok(quic_bi_stream_from_halves(send, recv, handle, chunk_size, false).into_flow())
}
})
}
#[must_use]
pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
self.open_bi(self.chunk_size)
}
#[must_use]
pub fn open_bi_stream(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
assert!(chunk_size > 0, "chunk size must be greater than zero");
let connection = self.connection.clone();
let handle = self.handle.clone();
Source::lazy_future_source(move || {
let connection = connection.clone();
let handle = handle.clone();
async move {
let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, false);
let metadata = stream.stream();
let stream = Arc::new(Mutex::new(Some(stream)));
Ok(Source::unfold_resource(
{
let stream = Arc::clone(&stream);
move || {
stream
.lock()
.expect("single-use QUIC bidi stream poisoned")
.take()
.map(Some)
.ok_or_else(|| {
StreamError::Failed(
"QUIC bidi stream already materialized".into(),
)
})
}
},
|stream| Ok(stream.take()),
|_stream| Ok(()),
)
.map_materialized_value(move |_| metadata))
}
})
}
#[must_use]
pub fn open_bi_stream_default(
&self,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
self.open_bi_stream(self.chunk_size)
}
#[must_use]
pub fn open_bi_stream_available(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
assert!(chunk_size > 0, "chunk size must be greater than zero");
let connection = self.connection.clone();
let handle = self.handle.clone();
Source::lazy_future_source(move || {
let connection = connection.clone();
let handle = handle.clone();
async move {
let (send, recv) = connection.open_bi().await.map_err(quic_error)?;
let stream = quic_bi_stream_from_halves(send, recv, handle, chunk_size, true);
let metadata = stream.stream();
let stream = Arc::new(Mutex::new(Some(stream)));
Ok(Source::unfold_resource(
{
let stream = Arc::clone(&stream);
move || {
stream
.lock()
.expect("single-use QUIC bidi stream poisoned")
.take()
.map(Some)
.ok_or_else(|| {
StreamError::Failed(
"QUIC bidi stream already materialized".into(),
)
})
}
},
|stream| Ok(stream.take()),
|_stream| Ok(()),
)
.map_materialized_value(move |_| metadata))
}
})
}
#[must_use]
pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
assert!(chunk_size > 0, "chunk size must be greater than zero");
let connection = self.clone();
Source::unfold_resource(
{
let connection = connection.clone();
move || {
let handle = connection.handle.clone();
let (demand_sender, demand_receiver) = mpsc::channel(1);
let (cancel_sender, cancel_receiver) = watch::channel(false);
let task = handle.spawn(run_accept_bi_task(
connection.connection.clone(),
chunk_size,
false,
handle.clone(),
demand_receiver,
cancel_receiver,
));
Ok(AcceptBiResource {
demands: demand_sender,
cancel: cancel_sender,
task,
})
}
},
receive_demand_response,
close_accept_bi_resource,
)
.map_materialized_value(move |_| connection.clone())
}
#[must_use]
pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
self.accept_bi(self.chunk_size)
}
#[must_use]
pub fn accept_bi_available(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, QuicConnection> {
assert!(chunk_size > 0, "chunk size must be greater than zero");
let connection = self.clone();
Source::unfold_resource(
{
let connection = connection.clone();
move || {
let handle = connection.handle.clone();
let (demand_sender, demand_receiver) = mpsc::channel(1);
let (cancel_sender, cancel_receiver) = watch::channel(false);
let task = handle.spawn(run_accept_bi_task(
connection.connection.clone(),
chunk_size,
true,
handle.clone(),
demand_receiver,
cancel_receiver,
));
Ok(AcceptBiResource {
demands: demand_sender,
cancel: cancel_sender,
task,
})
}
},
receive_demand_response,
close_accept_bi_resource,
)
.map_materialized_value(move |_| connection.clone())
}
pub fn close(&self, reason: &[u8]) {
self.connection.close(close_code(), reason);
}
}
#[derive(Debug, Clone)]
pub struct QuicIncomingConnection {
connection: QuicConnection,
}
impl QuicIncomingConnection {
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.connection.local_addr()
}
#[must_use]
pub fn remote_addr(&self) -> SocketAddr {
self.connection.remote_addr()
}
#[must_use]
pub fn connection(&self) -> QuicConnection {
self.connection.clone()
}
#[must_use]
pub fn into_connection(self) -> QuicConnection {
self.connection
}
#[must_use]
pub fn open_bi(
&self,
chunk_size: usize,
) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
self.connection.open_bi(chunk_size)
}
#[must_use]
pub fn open_bi_default(&self) -> Flow<Vec<u8>, Vec<u8>, StreamCompletion<QuicStream>> {
self.connection.open_bi_default()
}
#[must_use]
pub fn open_bi_stream(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
self.connection.open_bi_stream(chunk_size)
}
#[must_use]
pub fn open_bi_stream_default(
&self,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
self.connection.open_bi_stream_default()
}
#[must_use]
pub fn open_bi_stream_available(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, StreamCompletion<QuicStream>> {
self.connection.open_bi_stream_available(chunk_size)
}
#[must_use]
pub fn accept_bi(&self, chunk_size: usize) -> Source<QuicBidirectionalStream, QuicConnection> {
self.connection.accept_bi(chunk_size)
}
#[must_use]
pub fn accept_bi_default(&self) -> Source<QuicBidirectionalStream, QuicConnection> {
self.connection.accept_bi_default()
}
#[must_use]
pub fn accept_bi_available(
&self,
chunk_size: usize,
) -> Source<QuicBidirectionalStream, QuicConnection> {
self.connection.accept_bi_available(chunk_size)
}
}
pub struct QuicBidirectionalStream {
stream: QuicStream,
source: QuicByteSource,
sink: QuicByteSink,
}
impl QuicBidirectionalStream {
#[must_use]
pub fn stream(&self) -> QuicStream {
self.stream
}
#[must_use]
pub fn into_parts(self) -> (QuicByteSource, QuicByteSink) {
(self.source, self.sink)
}
#[must_use]
pub fn into_flow(self) -> Flow<Vec<u8>, Vec<u8>, QuicStream> {
Flow::from_sink_and_source(self.sink, self.source)
.map_materialized_value(move |_| self.stream)
}
}
pub struct TokioQuic;
pub type Quic = TokioQuic;
impl TokioQuic {
#[must_use]
pub fn bind<A>(
addr: A,
server_config: quinn::ServerConfig,
chunk_size: usize,
) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
assert!(chunk_size > 0, "chunk size must be greater than zero");
Source::lazy_future_source(move || {
let addr = addr.clone();
let server_config = server_config.clone();
async move {
let handle = Handle::current();
let addr = resolve_addr(addr).await?;
let endpoint = quinn::Endpoint::server(server_config, addr).map_err(io_error)?;
let local_addr = endpoint.local_addr().map_err(io_error)?;
Ok(quic_bind_source(endpoint, local_addr, handle, chunk_size))
}
})
}
#[must_use]
pub fn bind_default<A>(
addr: A,
server_config: quinn::ServerConfig,
) -> Source<QuicIncomingConnection, StreamCompletion<QuicBinding>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Self::bind(addr, server_config, DEFAULT_CHUNK_SIZE)
}
#[must_use]
pub fn connect<A>(
addr: A,
server_name: impl Into<String>,
client_config: quinn::ClientConfig,
chunk_size: usize,
) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
assert!(chunk_size > 0, "chunk size must be greater than zero");
let server_name = server_name.into();
Source::lazy_future_source(move || {
let addr = addr.clone();
let server_name = server_name.clone();
let client_config = client_config.clone();
async move {
let remote_addr = resolve_addr(addr).await?;
let local_addr = client_bind_addr(remote_addr);
let mut endpoint = quinn::Endpoint::client(local_addr).map_err(io_error)?;
endpoint.set_default_client_config(client_config);
let connecting = endpoint
.connect(remote_addr, &server_name)
.map_err(quic_error)?;
let connection = connecting.await.map_err(quic_error)?;
let endpoint_local_addr = endpoint.local_addr().map_err(io_error)?;
let connection = QuicConnection {
local_addr: connection_local_addr(
&connection,
endpoint_local_addr,
remote_addr.ip(),
),
remote_addr: connection.remote_address(),
endpoint,
connection,
handle: Handle::current(),
chunk_size,
};
let materialized = connection.clone();
Ok(
Source::single(connection)
.map_materialized_value(move |_| materialized.clone()),
)
}
})
}
#[must_use]
pub fn connect_default<A>(
addr: A,
server_name: impl Into<String>,
client_config: quinn::ClientConfig,
) -> Source<QuicConnection, StreamCompletion<QuicConnection>>
where
A: ToSocketAddrs + Clone + Send + Sync + 'static,
{
Self::connect(addr, server_name, client_config, DEFAULT_CHUNK_SIZE)
}
}
async fn resolve_addr<A>(addr: A) -> StreamResult<SocketAddr>
where
A: ToSocketAddrs,
{
let mut addrs = tokio::net::lookup_host(addr).await.map_err(io_error)?;
addrs
.next()
.ok_or_else(|| StreamError::Failed("address resolved to no socket addresses".into()))
}
fn client_bind_addr(remote_addr: SocketAddr) -> SocketAddr {
if remote_addr.is_ipv6() {
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
} else {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
}
}
fn connection_local_addr(
connection: &quinn::Connection,
endpoint_addr: SocketAddr,
fallback_ip: IpAddr,
) -> SocketAddr {
connection
.local_ip()
.map(|ip| SocketAddr::new(ip, endpoint_addr.port()))
.or_else(|| {
endpoint_addr
.ip()
.is_unspecified()
.then(|| SocketAddr::new(fallback_ip, endpoint_addr.port()))
})
.unwrap_or(endpoint_addr)
}
fn quic_bi_stream_from_halves(
send: quinn::SendStream,
recv: quinn::RecvStream,
handle: Handle,
chunk_size: usize,
emit_available: bool,
) -> QuicBidirectionalStream {
let stream = QuicStream { id: send.id() };
QuicBidirectionalStream {
stream,
source: single_use_quic_read_source(recv, handle.clone(), chunk_size, emit_available),
sink: single_use_quic_write_sink(send, handle),
}
}
fn single_use_quic_read_source(
recv: quinn::RecvStream,
handle: Handle,
chunk_size: usize,
emit_available: bool,
) -> QuicByteSource {
let recv = Arc::new(Mutex::new(Some(recv)));
Source::unfold_resource(
{
let recv = Arc::clone(&recv);
move || {
let recv = recv
.lock()
.expect("single-use QUIC recv stream poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed("QUIC recv stream already materialized".into())
})?;
let (sender, receiver) = mpsc::channel(1);
let (cancel_sender, cancel_receiver) = watch::channel(false);
let task = handle.spawn(run_read_task(
recv,
chunk_size,
sender,
emit_available,
cancel_receiver,
));
Ok(ReadResource {
receiver,
cancel: cancel_sender,
task,
})
}
},
|resource| match resource.receiver.blocking_recv() {
Some(DemandResponse::Item(chunk)) => Ok(Some(chunk)),
Some(DemandResponse::Complete) => Ok(None),
Some(DemandResponse::Error(error)) => Err(error),
None => Err(abrupt_termination()),
},
close_read_resource,
)
}
fn close_read_resource(resource: ReadResource) -> StreamResult<()> {
let _ = resource.cancel.send(true);
resource.task.abort();
Ok(())
}
async fn run_read_task(
mut recv: quinn::RecvStream,
chunk_size: usize,
sender: mpsc::Sender<DemandResponse<Vec<u8>>>,
emit_available: bool,
mut cancel: watch::Receiver<bool>,
) {
let mut buffer = vec![0_u8; chunk_size];
let mut pending_tail = Vec::with_capacity(chunk_size);
loop {
let read = tokio::select! {
read = recv.read(&mut buffer) => read,
changed = cancel.changed() => {
let _ = changed;
return;
}
};
match read {
Ok(Some(read)) => {
if !send_read_chunks(
&sender,
chunk_size,
&mut pending_tail,
&buffer[..read],
emit_available,
&mut cancel,
)
.await
{
return;
}
}
Ok(None) => {
if !pending_tail.is_empty()
&& !send_read_item(
&sender,
DemandResponse::Item(std::mem::take(&mut pending_tail)),
&mut cancel,
)
.await
{
return;
}
let _ = send_read_item(&sender, DemandResponse::Complete, &mut cancel).await;
return;
}
Err(error) => {
let _ = send_read_item(
&sender,
DemandResponse::Error(quic_error(error)),
&mut cancel,
)
.await;
return;
}
}
}
}
async fn send_read_chunks(
sender: &mpsc::Sender<DemandResponse<Vec<u8>>>,
chunk_size: usize,
pending_tail: &mut Vec<u8>,
read_buffer: &[u8],
emit_available: bool,
cancel: &mut watch::Receiver<bool>,
) -> bool {
let mut offset = 0;
if !pending_tail.is_empty() {
let needed = chunk_size - pending_tail.len();
let take = needed.min(read_buffer.len());
pending_tail.extend_from_slice(&read_buffer[..take]);
offset += take;
if pending_tail.len() == chunk_size
&& !send_read_item(
sender,
DemandResponse::Item(std::mem::take(pending_tail)),
cancel,
)
.await
{
return false;
}
}
while offset + chunk_size <= read_buffer.len() {
let next = offset + chunk_size;
if !send_read_item(
sender,
DemandResponse::Item(read_buffer[offset..next].to_vec()),
cancel,
)
.await
{
return false;
}
offset = next;
}
if offset < read_buffer.len() {
pending_tail.extend_from_slice(&read_buffer[offset..]);
}
if emit_available
&& !pending_tail.is_empty()
&& !send_read_item(
sender,
DemandResponse::Item(std::mem::take(pending_tail)),
cancel,
)
.await
{
return false;
}
true
}
async fn send_read_item<T>(
sender: &mpsc::Sender<DemandResponse<T>>,
item: DemandResponse<T>,
cancel: &mut watch::Receiver<bool>,
) -> bool
where
T: Send + 'static,
{
tokio::select! {
result = sender.send(item) => result.is_ok(),
changed = cancel.changed() => {
let _ = changed;
false
}
}
}
fn single_use_quic_write_sink(send: quinn::SendStream, handle: Handle) -> QuicByteSink {
let send = Arc::new(Mutex::new(Some(send)));
Flow::<Vec<u8>, Vec<u8>>::identity()
.map_with_resource(
{
let send = Arc::clone(&send);
move || {
send.lock()
.expect("single-use QUIC send stream poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed("QUIC send stream already materialized".into())
})
}
},
{
let handle = handle.clone();
move |send, chunk| {
handle.block_on(async { send.write_all(&chunk).await.map_err(quic_error) })?;
Ok(())
}
},
move |mut send| {
handle.block_on(async { send.write_all(&[]).await.map_err(quic_error) })?;
send.finish().map_err(quic_error)?;
Ok(None)
},
)
.to_mat(Sink::ignore(), Keep::right)
}
fn quic_bind_source(
endpoint: quinn::Endpoint,
local_addr: SocketAddr,
handle: Handle,
chunk_size: usize,
) -> Source<QuicIncomingConnection, QuicBinding> {
let endpoint = Arc::new(Mutex::new(Some(endpoint)));
Source::unfold_resource(
{
let endpoint = Arc::clone(&endpoint);
let handle = handle.clone();
move || {
let endpoint = endpoint
.lock()
.expect("single-use QUIC endpoint poisoned")
.take()
.ok_or_else(|| {
StreamError::Failed("QUIC endpoint already materialized".into())
})?;
let (demand_sender, demand_receiver) = mpsc::channel(1);
let (cancel_sender, cancel_receiver) = watch::channel(false);
let task = handle.spawn(run_quic_bind_task(
endpoint,
local_addr,
chunk_size,
handle.clone(),
demand_receiver,
cancel_receiver,
));
Ok(BindResource {
demands: demand_sender,
cancel: cancel_sender,
task,
})
}
},
receive_demand_response,
close_bind_resource,
)
.map_materialized_value(move |_| QuicBinding { local_addr })
}
fn receive_demand_response<T>(resource: &mut impl DemandResource<T>) -> StreamResult<Option<T>>
where
T: Send + 'static,
{
let (reply_sender, reply_receiver) = std_mpsc::channel();
resource
.demands()
.blocking_send(reply_sender)
.map_err(|_| abrupt_termination())?;
match reply_receiver.recv() {
Ok(DemandResponse::Item(item)) => Ok(Some(item)),
Ok(DemandResponse::Complete) => Ok(None),
Ok(DemandResponse::Error(error)) => Err(error),
Err(_) => Err(abrupt_termination()),
}
}
trait DemandResource<T>
where
T: Send + 'static,
{
fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<T>>>;
}
impl DemandResource<QuicIncomingConnection> for BindResource {
fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>> {
&self.demands
}
}
impl DemandResource<QuicBidirectionalStream> for AcceptBiResource {
fn demands(&self) -> &mpsc::Sender<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>> {
&self.demands
}
}
fn close_bind_resource(resource: BindResource) -> StreamResult<()> {
let _ = resource.cancel.send(true);
resource.task.abort();
Ok(())
}
fn close_accept_bi_resource(resource: AcceptBiResource) -> StreamResult<()> {
let _ = resource.cancel.send(true);
resource.task.abort();
Ok(())
}
async fn run_quic_bind_task(
endpoint: quinn::Endpoint,
local_addr: SocketAddr,
chunk_size: usize,
handle: Handle,
mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicIncomingConnection>>>,
mut cancel: watch::Receiver<bool>,
) {
loop {
let reply = tokio::select! {
demand = demands.recv() => match demand {
Some(reply) => reply,
None => return,
},
changed = cancel.changed() => {
let _ = changed;
return;
}
};
let incoming = tokio::select! {
incoming = endpoint.accept() => incoming,
changed = cancel.changed() => {
let _ = changed;
return;
}
};
let Some(incoming) = incoming else {
let _ = reply.send(DemandResponse::Complete);
return;
};
let connected = tokio::select! {
connected = incoming => connected,
changed = cancel.changed() => {
let _ = changed;
return;
}
};
match connected {
Ok(connection) => {
let incoming = QuicIncomingConnection {
connection: QuicConnection {
endpoint: endpoint.clone(),
local_addr: connection_local_addr(&connection, local_addr, local_addr.ip()),
remote_addr: connection.remote_address(),
connection,
handle: handle.clone(),
chunk_size,
},
};
if reply.send(DemandResponse::Item(incoming)).is_err() {
return;
}
}
Err(error) => {
let _ = reply.send(DemandResponse::Error(quic_error(error)));
return;
}
}
}
}
async fn run_accept_bi_task(
connection: quinn::Connection,
chunk_size: usize,
emit_available: bool,
handle: Handle,
mut demands: mpsc::Receiver<std_mpsc::Sender<DemandResponse<QuicBidirectionalStream>>>,
mut cancel: watch::Receiver<bool>,
) {
loop {
let reply = tokio::select! {
demand = demands.recv() => match demand {
Some(reply) => reply,
None => return,
},
changed = cancel.changed() => {
let _ = changed;
return;
}
};
let accepted = tokio::select! {
accepted = connection.accept_bi() => accepted,
changed = cancel.changed() => {
let _ = changed;
return;
}
};
match accepted {
Ok((send, recv)) => {
let stream = quic_bi_stream_from_halves(
send,
recv,
handle.clone(),
chunk_size,
emit_available,
);
if reply.send(DemandResponse::Item(stream)).is_err() {
return;
}
}
Err(error) => {
let _ = reply.send(DemandResponse::Error(quic_error(error)));
return;
}
}
}
}