use hashbrown::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use bytes::Bytes;
use quinn::{Connection, Endpoint, ReadExactError, RecvStream, SendStream};
use tokio::runtime::Runtime;
use super::quic_channel::{default_server_config, insecure_client_config};
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
const MULTIPLEXED_MAGIC: &[u8; 4] = b"NXMX";
const STREAM_HEARTBEAT: u8 = 0;
const STREAM_LOG: u8 = 1;
const STREAM_ACK: u8 = 2;
const STREAM_RESTORE: u8 = 3;
const NUM_STREAMS: usize = 4;
pub fn mux_server_config() -> Result<quinn::ServerConfig> {
let mut cfg = default_server_config()?;
let mut transport = quinn::TransportConfig::default();
transport.mtu_discovery_config(None);
transport.datagram_receive_buffer_size(Some(64 * 1024));
cfg.transport_config(Arc::new(transport));
Ok(cfg)
}
pub fn mux_insecure_client_config() -> quinn::ClientConfig {
let mut cfg = insecure_client_config();
let mut transport = quinn::TransportConfig::default();
transport.mtu_discovery_config(None);
transport.datagram_receive_buffer_size(Some(64 * 1024));
cfg.transport_config(Arc::new(transport));
cfg
}
pub(super) struct QuicSubChannel {
pub(super) send: Arc<tokio::sync::Mutex<SendStream>>,
recv: Arc<tokio::sync::Mutex<RecvStream>>,
runtime: Arc<Runtime>,
pub(super) open: AtomicBool,
}
impl QuicSubChannel {
fn new(send: SendStream, recv: RecvStream, runtime: Arc<Runtime>) -> Self {
Self {
send: Arc::new(tokio::sync::Mutex::new(send)),
recv: Arc::new(tokio::sync::Mutex::new(recv)),
runtime,
open: AtomicBool::new(true),
}
}
}
impl Channel for QuicSubChannel {
fn send(&self, data: &[u8]) -> Result<()> {
if !self.is_open() {
return Err(RepError::ChannelClosed(
"QuicSubChannel is closed".into(),
));
}
let len_prefix = (data.len() as u32).to_le_bytes();
let payload = data.to_vec();
self.runtime.block_on(async {
let mut stream = self.send.lock().await;
stream
.write_all(&len_prefix)
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
stream
.write_all(&payload)
.await
.map_err(|e| RepError::NetworkError(e.to_string()))
})
}
fn receive(&self, timeout: Duration) -> Result<Option<Vec<u8>>> {
if !self.is_open() {
return Err(RepError::ChannelClosed(
"QuicSubChannel is closed".into(),
));
}
self.runtime.block_on(async {
let mut stream = self.recv.lock().await;
let mut len_buf = [0u8; 4];
match tokio::time::timeout(timeout, stream.read_exact(&mut len_buf))
.await
{
Err(_elapsed) => return Ok(None),
Ok(Ok(_)) => {}
Ok(Err(ReadExactError::FinishedEarly(_))) => {
return Err(RepError::ChannelClosed(
"QUIC stream closed by peer".into(),
));
}
Ok(Err(ReadExactError::ReadError(e))) => {
return Err(RepError::NetworkError(e.to_string()));
}
}
let payload_len = u32::from_le_bytes(len_buf) as usize;
if payload_len > crate::net::channel::MAX_FRAME_PAYLOAD {
return Err(RepError::ProtocolError(format!(
"frame payload too large: {} > {}",
payload_len,
crate::net::channel::MAX_FRAME_PAYLOAD
)));
}
let mut payload = vec![0u8; payload_len];
stream.read_exact(&mut payload).await.map_err(|e| match e {
ReadExactError::FinishedEarly(_) => RepError::ChannelClosed(
"QUIC stream closed mid-payload".into(),
),
ReadExactError::ReadError(re) => {
RepError::NetworkError(re.to_string())
}
})?;
Ok(Some(payload))
})
}
fn close(&self) -> Result<()> {
if !self.open.swap(false, Ordering::SeqCst) {
return Ok(());
}
self.runtime.block_on(async {
let mut stream = self.send.lock().await;
let _ = stream.finish();
tokio::time::sleep(Duration::from_millis(50)).await;
});
Ok(())
}
fn is_open(&self) -> bool {
self.open.load(Ordering::SeqCst)
}
}
pub trait ReplicationChannel: Send + Sync {
fn heartbeat_channel(&self) -> &dyn Channel;
fn log_channel(&self) -> &dyn Channel;
fn ack_channel(&self) -> &dyn Channel;
fn restore_channel(&self) -> &dyn Channel;
fn send_vlsn_datagram(&self, vlsn: i64) -> Result<()>;
fn recv_vlsn_datagram(&self, timeout: Duration) -> Result<Option<i64>>;
}
pub struct ReconnectToken {
pub endpoint: Endpoint,
pub runtime: Arc<Runtime>,
}
pub struct QuicMultiplexedChannel {
endpoint: Option<Endpoint>,
connection: Connection,
heartbeat: QuicSubChannel,
log: QuicSubChannel,
ack: QuicSubChannel,
restore: QuicSubChannel,
runtime: Arc<Runtime>,
open: AtomicBool,
}
impl QuicMultiplexedChannel {
fn build_runtime() -> Result<Arc<Runtime>> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map(Arc::new)
.map_err(|e| RepError::NetworkError(format!("tokio: {e}")))
}
pub fn connect(addr: SocketAddr, server_name: &str) -> Result<Self> {
let runtime = Self::build_runtime()?;
Self::connect_inner(
addr,
server_name,
mux_insecure_client_config(),
runtime,
None,
)
}
pub fn connect_with_config(
addr: SocketAddr,
server_name: &str,
client_cfg: quinn::ClientConfig,
) -> Result<Self> {
let runtime = Self::build_runtime()?;
Self::connect_inner(addr, server_name, client_cfg, runtime, None)
}
pub fn connect_host(
host: &str,
port: u16,
server_name: &str,
) -> Result<Self> {
let addrs: Vec<SocketAddr> = (host, port)
.to_socket_addrs()
.map_err(|e| {
RepError::NetworkError(format!(
"DNS resolution failed for {host}:{port}: {e}"
))
})?
.collect();
if addrs.is_empty() {
return Err(RepError::NetworkError(format!(
"no addresses resolved for {host}:{port}"
)));
}
let mut sorted = addrs;
sorted.sort_by_key(|a| if a.is_ipv6() { 0u8 } else { 1u8 });
let mut last_err = None;
for addr in &sorted {
match Self::connect(*addr, server_name) {
Ok(ch) => return Ok(ch),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
RepError::NetworkError(format!(
"could not connect to {host}:{port}"
))
}))
}
pub fn connect_with_token(
token: ReconnectToken,
addr: SocketAddr,
server_name: &str,
) -> Result<Self> {
Self::connect_inner(
addr,
server_name,
mux_insecure_client_config(),
token.runtime,
Some(token.endpoint),
)
}
pub fn connect_with_endpoint(
endpoint: Endpoint,
addr: SocketAddr,
server_name: &str,
) -> Result<Self> {
let runtime = Self::build_runtime()?;
Self::connect_inner(
addr,
server_name,
mux_insecure_client_config(),
runtime,
Some(endpoint),
)
}
fn connect_inner(
addr: SocketAddr,
server_name: &str,
client_cfg: quinn::ClientConfig,
runtime: Arc<Runtime>,
existing_endpoint: Option<Endpoint>,
) -> Result<Self> {
let server_name = server_name.to_string();
let (endpoint, conn, mut stream_map): (
Endpoint,
Connection,
HashMap<u8, (SendStream, RecvStream)>,
) = runtime.block_on(async move {
let mut endpoint = match existing_endpoint {
Some(ep) => ep,
None => Endpoint::client(
"0.0.0.0:0".parse().expect("valid bind addr"),
)
.map_err(|e| RepError::NetworkError(e.to_string()))?,
};
endpoint.set_default_client_config(client_cfg);
let conn = endpoint
.connect(addr, &server_name)
.map_err(|e| RepError::NetworkError(e.to_string()))?
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
let stream_types =
[STREAM_HEARTBEAT, STREAM_LOG, STREAM_ACK, STREAM_RESTORE];
let mut map: HashMap<u8, (SendStream, RecvStream)> =
HashMap::with_capacity(NUM_STREAMS);
for stream_type in stream_types {
let (mut send, recv) = conn
.open_bi()
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
let mut handshake = [0u8; 5];
handshake[..4].copy_from_slice(MULTIPLEXED_MAGIC);
handshake[4] = stream_type;
send.write_all(&handshake)
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
map.insert(stream_type, (send, recv));
}
Ok::<_, RepError>((endpoint, conn, map))
})?;
let (hb_s, hb_r) =
stream_map.remove(&STREAM_HEARTBEAT).ok_or_else(|| {
RepError::NetworkError("missing heartbeat stream".into())
})?;
let (log_s, log_r) =
stream_map.remove(&STREAM_LOG).ok_or_else(|| {
RepError::NetworkError("missing log stream".into())
})?;
let (ack_s, ack_r) =
stream_map.remove(&STREAM_ACK).ok_or_else(|| {
RepError::NetworkError("missing ack stream".into())
})?;
let (rst_s, rst_r) =
stream_map.remove(&STREAM_RESTORE).ok_or_else(|| {
RepError::NetworkError("missing restore stream".into())
})?;
let (hb_rt, log_rt, ack_rt, rst_rt) = (
Arc::clone(&runtime),
Arc::clone(&runtime),
Arc::clone(&runtime),
Arc::clone(&runtime),
);
Ok(Self {
endpoint: Some(endpoint),
connection: conn,
heartbeat: QuicSubChannel::new(hb_s, hb_r, hb_rt),
log: QuicSubChannel::new(log_s, log_r, log_rt),
ack: QuicSubChannel::new(ack_s, ack_r, ack_rt),
restore: QuicSubChannel::new(rst_s, rst_r, rst_rt),
runtime,
open: AtomicBool::new(true),
})
}
pub fn into_reconnect_token(mut self) -> Option<ReconnectToken> {
let endpoint = self.endpoint.take()?;
let runtime = Arc::clone(&self.runtime);
Some(ReconnectToken { endpoint, runtime })
}
pub fn is_open(&self) -> bool {
self.open.load(Ordering::SeqCst)
}
pub fn close_all(&self) -> Result<()> {
if !self.open.swap(false, Ordering::SeqCst) {
return Ok(());
}
self.heartbeat.open.store(false, Ordering::SeqCst);
self.log.open.store(false, Ordering::SeqCst);
self.ack.open.store(false, Ordering::SeqCst);
self.restore.open.store(false, Ordering::SeqCst);
self.runtime.block_on(async {
let _ = self.heartbeat.send.lock().await.finish();
let _ = self.log.send.lock().await.finish();
let _ = self.ack.send.lock().await.finish();
let _ = self.restore.send.lock().await.finish();
tokio::time::sleep(Duration::from_millis(50)).await;
});
Ok(())
}
}
impl ReplicationChannel for QuicMultiplexedChannel {
fn heartbeat_channel(&self) -> &dyn Channel {
&self.heartbeat
}
fn log_channel(&self) -> &dyn Channel {
&self.log
}
fn ack_channel(&self) -> &dyn Channel {
&self.ack
}
fn restore_channel(&self) -> &dyn Channel {
&self.restore
}
fn send_vlsn_datagram(&self, vlsn: i64) -> Result<()> {
self.connection
.send_datagram(Bytes::from(vlsn.to_le_bytes().to_vec()))
.map_err(|e| RepError::NetworkError(format!("datagram send: {e}")))
}
fn recv_vlsn_datagram(&self, timeout: Duration) -> Result<Option<i64>> {
self.runtime.block_on(async {
match tokio::time::timeout(timeout, self.connection.read_datagram())
.await
{
Err(_elapsed) => Ok(None),
Ok(Ok(data)) => {
if data.len() != 8 {
return Err(RepError::NetworkError(format!(
"VLSN datagram: expected 8 bytes, got {}",
data.len()
)));
}
let bytes: [u8; 8] =
data[..8].try_into().expect("length checked above");
Ok(Some(i64::from_le_bytes(bytes)))
}
Ok(Err(e)) => {
Err(RepError::NetworkError(format!("datagram recv: {e}")))
}
}
})
}
}
impl Drop for QuicMultiplexedChannel {
fn drop(&mut self) {
if self.is_open() {
let _ = self.close_all();
}
}
}
pub struct QuicMultiplexedChannelListener {
endpoint: Endpoint,
runtime: Arc<Runtime>,
}
impl QuicMultiplexedChannelListener {
pub fn bind(addr: SocketAddr) -> Result<Self> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(|e| RepError::NetworkError(format!("tokio: {e}")))?,
);
let server_cfg = mux_server_config()?;
let endpoint = runtime.block_on(async move {
Endpoint::server(server_cfg, addr)
.map_err(|e| RepError::NetworkError(e.to_string()))
})?;
Ok(Self { endpoint, runtime })
}
pub fn with_server_config(
addr: SocketAddr,
server_cfg: quinn::ServerConfig,
) -> Result<Self> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(|e| RepError::NetworkError(format!("tokio: {e}")))?,
);
let endpoint = runtime.block_on(async move {
Endpoint::server(server_cfg, addr)
.map_err(|e| RepError::NetworkError(e.to_string()))
})?;
Ok(Self { endpoint, runtime })
}
pub fn local_addr(&self) -> Result<SocketAddr> {
self.endpoint
.local_addr()
.map_err(|e| RepError::NetworkError(e.to_string()))
}
pub fn accept(&self) -> Result<QuicMultiplexedChannel> {
self.runtime.block_on(async {
let incoming = self.endpoint.accept().await.ok_or_else(|| {
RepError::NetworkError("QUIC endpoint closed".into())
})?;
let conn = incoming
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
let mut stream_map: HashMap<u8, (SendStream, RecvStream)> =
HashMap::with_capacity(NUM_STREAMS);
for _ in 0..NUM_STREAMS {
let (send, mut recv) = conn
.accept_bi()
.await
.map_err(|e| RepError::NetworkError(e.to_string()))?;
let mut handshake = [0u8; 5];
recv.read_exact(&mut handshake).await.map_err(|e| {
RepError::NetworkError(format!("mux handshake: {e}"))
})?;
if &handshake[..4] != MULTIPLEXED_MAGIC {
return Err(RepError::NetworkError(format!(
"invalid mux magic: {:02x?}",
&handshake[..4]
)));
}
let stream_type = handshake[4];
if stream_map.contains_key(&stream_type) {
return Err(RepError::NetworkError(format!(
"duplicate stream type {stream_type}"
)));
}
stream_map.insert(stream_type, (send, recv));
}
let (hb_s, hb_r) =
stream_map.remove(&STREAM_HEARTBEAT).ok_or_else(|| {
RepError::NetworkError("missing heartbeat stream".into())
})?;
let (log_s, log_r) =
stream_map.remove(&STREAM_LOG).ok_or_else(|| {
RepError::NetworkError("missing log stream".into())
})?;
let (ack_s, ack_r) =
stream_map.remove(&STREAM_ACK).ok_or_else(|| {
RepError::NetworkError("missing ack stream".into())
})?;
let (rst_s, rst_r) =
stream_map.remove(&STREAM_RESTORE).ok_or_else(|| {
RepError::NetworkError("missing restore stream".into())
})?;
let rt = Arc::clone(&self.runtime);
Ok(QuicMultiplexedChannel {
endpoint: None, connection: conn,
heartbeat: QuicSubChannel::new(hb_s, hb_r, Arc::clone(&rt)),
log: QuicSubChannel::new(log_s, log_r, Arc::clone(&rt)),
ack: QuicSubChannel::new(ack_s, ack_r, Arc::clone(&rt)),
restore: QuicSubChannel::new(rst_s, rst_r, Arc::clone(&rt)),
runtime: rt,
open: AtomicBool::new(true),
})
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn loopback_mux_listener() -> QuicMultiplexedChannelListener {
QuicMultiplexedChannelListener::bind("127.0.0.1:0".parse().unwrap())
.expect("bind mux QUIC listener")
}
#[test]
fn test_mux_connect_and_close() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
assert!(ch.is_open());
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
assert!(client.is_open());
client.close_all().unwrap();
assert!(!client.is_open());
server_thread.join().unwrap();
}
#[test]
fn test_mux_heartbeat_send_receive() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let msg =
ch.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(msg, Some(b"hb-ping".to_vec()));
ch.heartbeat_channel().send(b"hb-pong").unwrap();
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.heartbeat_channel().send(b"hb-ping").unwrap();
let reply =
client.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(reply, Some(b"hb-pong".to_vec()));
server_thread.join().unwrap();
}
#[test]
fn test_mux_log_send_receive() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let msg = ch.log_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(msg, Some(b"log-entry".to_vec()));
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.log_channel().send(b"log-entry").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_ack_send_receive() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let msg = ch.ack_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(msg, Some(b"ack-42".to_vec()));
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.ack_channel().send(b"ack-42").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_restore_send_receive() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let msg =
ch.restore_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(msg, Some(b"restore-block".to_vec()));
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.restore_channel().send(b"restore-block").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_all_streams_independent() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let hb =
ch.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
let log = ch.log_channel().receive(Duration::from_secs(5)).unwrap();
let ack = ch.ack_channel().receive(Duration::from_secs(5)).unwrap();
let rst =
ch.restore_channel().receive(Duration::from_secs(5)).unwrap();
(hb, log, ack, rst)
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.heartbeat_channel().send(b"heartbeat").unwrap();
client.log_channel().send(b"log").unwrap();
client.ack_channel().send(b"ack").unwrap();
client.restore_channel().send(b"restore").unwrap();
let (hb, log, ack, rst) = server_thread.join().unwrap();
assert_eq!(hb, Some(b"heartbeat".to_vec()));
assert_eq!(log, Some(b"log".to_vec()));
assert_eq!(ack, Some(b"ack".to_vec()));
assert_eq!(rst, Some(b"restore".to_vec()));
}
#[test]
fn test_mux_streams_dont_interfere() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let large_payload: Vec<u8> = vec![0xABu8; 32 * 1024];
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
for _ in 0..10 {
ch.log_channel().receive(Duration::from_secs(5)).unwrap();
}
let hb =
ch.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(hb, Some(b"hb".to_vec()));
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
for _ in 0..10 {
client.log_channel().send(&large_payload).unwrap();
}
client.heartbeat_channel().send(b"hb").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_vlsn_datagram_roundtrip() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let vlsn = ch.recv_vlsn_datagram(Duration::from_secs(5)).unwrap();
assert_eq!(vlsn, Some(42_i64));
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
client.send_vlsn_datagram(42).unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_vlsn_datagram_timeout() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
listener.accept().unwrap()
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
let result =
client.recv_vlsn_datagram(Duration::from_millis(300)).unwrap();
assert_eq!(result, None, "expected timeout → None");
drop(server_thread.join().unwrap());
}
#[test]
fn test_mux_replication_channel_trait_object() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch: Box<dyn ReplicationChannel> =
Box::new(listener.accept().unwrap());
let msg =
ch.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
assert_eq!(msg, Some(b"trait test".to_vec()));
});
let client: Box<dyn ReplicationChannel> = Box::new(
QuicMultiplexedChannel::connect(addr, "localhost").unwrap(),
);
client.heartbeat_channel().send(b"trait test").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_mux_reconnect_with_endpoint() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
ch.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
let ch2 = listener.accept().unwrap();
ch2.heartbeat_channel().receive(Duration::from_secs(5)).unwrap();
});
let first = QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
first.heartbeat_channel().send(b"first-conn").unwrap();
let token = first.into_reconnect_token().unwrap();
let second = QuicMultiplexedChannel::connect_with_token(
token,
addr,
"localhost",
)
.unwrap();
second.heartbeat_channel().send(b"second-conn").unwrap();
server_thread.join().unwrap();
}
#[test]
fn test_quic_mux_rejects_oversize_frame() {
let listener = loopback_mux_listener();
let addr = listener.local_addr().unwrap();
let server_thread = std::thread::spawn(move || {
let ch = listener.accept().unwrap();
let oversized =
vec![0u8; crate::net::channel::MAX_FRAME_PAYLOAD + 1];
let _ = ch.heartbeat_channel().send(&oversized);
});
let client =
QuicMultiplexedChannel::connect(addr, "localhost").unwrap();
let result =
client.heartbeat_channel().receive(Duration::from_secs(10));
let _ = client.heartbeat_channel().close();
let err = result.expect_err("oversize QUIC mux frame must be rejected");
match err {
RepError::ProtocolError(msg) => {
assert!(
msg.contains("frame payload too large"),
"unexpected protocol-error message: {}",
msg
);
}
other => panic!("expected ProtocolError, got {:?}", other),
}
let _ = server_thread.join();
}
}