use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use bytes::Bytes;
use slotmap::{KeyData, SlotMap};
use tokio::sync::mpsc;
use crate::CoreError;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
pub struct SessionEntry {
pub conn: iroh::endpoint::Connection,
}
pub struct ResponseHeadEntry {
pub status: u16,
pub headers: Vec<(String, String)>,
}
slotmap::new_key_type! { pub(crate) struct ReaderKey; }
slotmap::new_key_type! { pub(crate) struct WriterKey; }
slotmap::new_key_type! { pub(crate) struct TrailerTxKey; }
slotmap::new_key_type! { pub(crate) struct TrailerRxKey; }
slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
slotmap::new_key_type! { pub(crate) struct SessionKey; }
slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
k.data().as_ffi()
}
macro_rules! handle_to_key {
($fn_name:ident, $key_type:ty) => {
fn $fn_name(h: u64) -> $key_type {
<$key_type>::from(KeyData::from_ffi(h))
}
};
}
handle_to_key!(handle_to_reader_key, ReaderKey);
handle_to_key!(handle_to_writer_key, WriterKey);
handle_to_key!(handle_to_trailer_tx_key, TrailerTxKey);
handle_to_key!(handle_to_trailer_rx_key, TrailerRxKey);
handle_to_key!(handle_to_session_key, SessionKey);
handle_to_key!(handle_to_request_head_key, RequestHeadKey);
handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
pub struct BodyReader {
pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
pub(crate) cancel: Arc<tokio::sync::Notify>,
pub(crate) drain_timeout: Duration,
}
pub struct BodyWriter {
pub(crate) tx: mpsc::Sender<Bytes>,
pub(crate) drain_timeout: Duration,
}
pub fn make_body_channel() -> (BodyWriter, BodyReader) {
make_body_channel_with(
DEFAULT_CHANNEL_CAPACITY,
Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
)
}
fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
let (tx, rx) = mpsc::channel(capacity);
(
BodyWriter { tx, drain_timeout },
BodyReader {
rx: Arc::new(tokio::sync::Mutex::new(rx)),
cancel: Arc::new(tokio::sync::Notify::new()),
drain_timeout,
},
)
}
async fn recv_with_cancel(
rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
cancel: Arc<tokio::sync::Notify>,
) -> Option<Bytes> {
tokio::select! {
biased;
_ = cancel.notified() => None,
chunk = async { rx.lock().await.recv().await } => chunk,
}
}
impl BodyReader {
pub async fn next_chunk(&self) -> Option<Bytes> {
recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
}
}
impl BodyWriter {
pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
.await
.map_err(|_| "drain timeout: body reader is too slow".to_string())?
.map_err(|_| "body reader dropped".to_string())
}
}
type TrailerTx = tokio::sync::oneshot::Sender<Vec<(String, String)>>;
pub(crate) type TrailerRx = tokio::sync::oneshot::Receiver<Vec<(String, String)>>;
#[derive(Debug, Clone)]
pub struct StoreConfig {
pub channel_capacity: usize,
pub max_chunk_size: usize,
pub drain_timeout: Duration,
pub max_handles: usize,
pub ttl: Duration,
}
impl Default for StoreConfig {
fn default() -> Self {
Self {
channel_capacity: DEFAULT_CHANNEL_CAPACITY,
max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
max_handles: DEFAULT_MAX_HANDLES,
ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
}
}
}
struct Timed<T> {
value: T,
created_at: Instant,
}
impl<T> Timed<T> {
fn new(value: T) -> Self {
Self {
value,
created_at: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.created_at.elapsed() > ttl
}
}
struct PendingReaderEntry {
reader: BodyReader,
created: Instant,
}
struct PendingTrailerRxEntry {
rx: TrailerRx,
created: Instant,
}
pub(crate) struct InsertGuard<'a> {
store: &'a HandleStore,
tracked: Vec<TrackedHandle>,
committed: bool,
}
enum TrackedHandle {
Reader(u64),
Writer(u64),
TrailerTx(u64),
TrailerRx(u64),
ReqHead(u64),
}
impl<'a> InsertGuard<'a> {
fn new(store: &'a HandleStore) -> Self {
Self {
store,
tracked: Vec::new(),
committed: false,
}
}
pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
let h = self.store.insert_reader(reader)?;
self.tracked.push(TrackedHandle::Reader(h));
Ok(h)
}
pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
let h = self.store.insert_writer(writer)?;
self.tracked.push(TrackedHandle::Writer(h));
Ok(h)
}
pub fn insert_trailer_sender(&mut self, tx: TrailerTx) -> Result<u64, CoreError> {
let h = self.store.insert_trailer_sender(tx)?;
self.tracked.push(TrackedHandle::TrailerTx(h));
Ok(h)
}
pub fn insert_trailer_receiver(&mut self, rx: TrailerRx) -> Result<u64, CoreError> {
let h = self.store.insert_trailer_receiver(rx)?;
self.tracked.push(TrackedHandle::TrailerRx(h));
Ok(h)
}
pub fn allocate_req_handle(
&mut self,
sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
) -> Result<u64, CoreError> {
let h = self.store.allocate_req_handle(sender)?;
self.tracked.push(TrackedHandle::ReqHead(h));
Ok(h)
}
pub fn commit(mut self) {
self.committed = true;
}
}
impl Drop for InsertGuard<'_> {
fn drop(&mut self) {
if self.committed {
return;
}
for handle in &self.tracked {
match handle {
TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
TrackedHandle::Writer(h) => {
let _ = self.store.finish_body(*h);
}
TrackedHandle::TrailerTx(h) => self.store.remove_trailer_sender(*h),
TrackedHandle::TrailerRx(h) => {
self.store
.trailer_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_trailer_rx_key(*h));
}
TrackedHandle::ReqHead(h) => {
self.store
.request_heads
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_request_head_key(*h));
}
}
}
}
}
pub struct HandleStore {
readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
trailer_tx: Mutex<SlotMap<TrailerTxKey, Timed<TrailerTx>>>,
trailer_rx: Mutex<SlotMap<TrailerRxKey, Timed<TrailerRx>>>,
sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
request_heads:
Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
pending_trailer_rxs: Mutex<HashMap<u64, PendingTrailerRxEntry>>,
pub(crate) config: StoreConfig,
}
impl HandleStore {
pub fn new(config: StoreConfig) -> Self {
Self {
readers: Mutex::new(SlotMap::with_key()),
writers: Mutex::new(SlotMap::with_key()),
trailer_tx: Mutex::new(SlotMap::with_key()),
trailer_rx: Mutex::new(SlotMap::with_key()),
sessions: Mutex::new(SlotMap::with_key()),
request_heads: Mutex::new(SlotMap::with_key()),
fetch_cancels: Mutex::new(SlotMap::with_key()),
pending_readers: Mutex::new(HashMap::new()),
pending_trailer_rxs: Mutex::new(HashMap::new()),
config,
}
}
pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
InsertGuard::new(self)
}
pub fn drain_timeout(&self) -> Duration {
self.config.drain_timeout
}
pub fn max_chunk_size(&self) -> usize {
self.config.max_chunk_size
}
pub fn count_handles(&self) -> (usize, usize, usize, usize) {
let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner()).len();
let total = readers
+ writers
+ sessions
+ self.trailer_tx.lock().unwrap_or_else(|e| e.into_inner()).len()
+ self.trailer_rx.lock().unwrap_or_else(|e| e.into_inner()).len()
+ self.request_heads.lock().unwrap_or_else(|e| e.into_inner()).len()
+ self.fetch_cancels.lock().unwrap_or_else(|e| e.into_inner()).len();
(readers, writers, sessions, total)
}
pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
}
fn insert_checked<K: slotmap::Key, T>(
registry: &Mutex<SlotMap<K, Timed<T>>>,
value: T,
max: usize,
) -> Result<u64, CoreError> {
let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
if reg.len() >= max {
return Err(CoreError::internal("handle registry at capacity"));
}
let key = reg.insert(Timed::new(value));
Ok(key_to_handle(key))
}
pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
Self::insert_checked(&self.readers, reader, self.config.max_handles)
}
pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
Self::insert_checked(&self.writers, writer, self.config.max_handles)
}
pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
let (writer, reader) = self.make_body_channel();
let handle = self.insert_writer(writer)?;
Ok((handle, reader))
}
pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
self.pending_readers
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(
writer_handle,
PendingReaderEntry {
reader,
created: Instant::now(),
},
);
}
pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
self.pending_readers
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&writer_handle)
.map(|e| e.reader)
}
pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
let (rx_arc, cancel) = {
let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
let entry = reg
.get(handle_to_reader_key(handle))
.ok_or_else(|| CoreError::invalid_handle(handle))?;
(entry.value.rx.clone(), entry.value.cancel.clone())
};
let chunk = recv_with_cancel(rx_arc, cancel).await;
if chunk.is_none() {
self.readers
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_reader_key(handle));
}
Ok(chunk)
}
pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
let (tx, timeout) = {
let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
let entry = reg
.get(handle_to_writer_key(handle))
.ok_or_else(|| CoreError::invalid_handle(handle))?;
(entry.value.tx.clone(), entry.value.drain_timeout)
};
let max = self.config.max_chunk_size;
if chunk.len() <= max {
tokio::time::timeout(timeout, tx.send(chunk))
.await
.map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
.map_err(|_| CoreError::internal("body reader dropped"))
} else {
let mut offset = 0;
while offset < chunk.len() {
let end = (offset + max).min(chunk.len());
tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
.await
.map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
.map_err(|_| CoreError::internal("body reader dropped"))?;
offset = end;
}
Ok(())
}
}
pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
self.writers
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_writer_key(handle))
.ok_or_else(|| CoreError::invalid_handle(handle))?;
Ok(())
}
pub fn cancel_reader(&self, handle: u64) {
let entry = self
.readers
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_reader_key(handle));
if let Some(e) = entry {
e.value.cancel.notify_waiters();
}
}
pub fn insert_trailer_sender(&self, tx: TrailerTx) -> Result<u64, CoreError> {
Self::insert_checked(&self.trailer_tx, tx, self.config.max_handles)
}
pub fn insert_trailer_receiver(&self, rx: TrailerRx) -> Result<u64, CoreError> {
Self::insert_checked(&self.trailer_rx, rx, self.config.max_handles)
}
pub fn remove_trailer_sender(&self, handle: u64) {
self.trailer_tx
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_trailer_tx_key(handle));
}
pub fn alloc_trailer_sender(&self) -> Result<u64, CoreError> {
let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
let handle = self.insert_trailer_sender(tx)?;
self.pending_trailer_rxs
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(handle, PendingTrailerRxEntry { rx, created: Instant::now() });
Ok(handle)
}
pub fn claim_pending_trailer_rx(&self, sender_handle: u64) -> Option<TrailerRx> {
self.pending_trailer_rxs
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&sender_handle)
.map(|e| e.rx)
}
pub fn send_trailers(
&self,
handle: u64,
trailers: Vec<(String, String)>,
) -> Result<(), CoreError> {
let tx = self
.trailer_tx
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_trailer_tx_key(handle))
.ok_or_else(|| CoreError::invalid_handle(handle))?
.value;
tx.send(trailers)
.map_err(|_| CoreError::internal("trailer receiver dropped"))
}
pub async fn next_trailer(
&self,
handle: u64,
) -> Result<Option<Vec<(String, String)>>, CoreError> {
let rx = self
.trailer_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_trailer_rx_key(handle))
.ok_or_else(|| CoreError::invalid_handle(handle))?
.value;
match rx.await {
Ok(trailers) => Ok(Some(trailers)),
Err(_) => Ok(None), }
}
pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
}
pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
self.sessions
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(handle_to_session_key(handle))
.map(|e| e.value.clone())
}
pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
self.sessions
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_session_key(handle))
.map(|e| e.value)
}
pub fn allocate_req_handle(
&self,
sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
) -> Result<u64, CoreError> {
Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
}
pub fn take_req_sender(
&self,
handle: u64,
) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
self.request_heads
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_request_head_key(handle))
.map(|e| e.value)
}
pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
let notify = Arc::new(tokio::sync::Notify::new());
Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
}
pub fn cancel_in_flight(&self, token: u64) {
if let Some(entry) = self
.fetch_cancels
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(handle_to_fetch_cancel_key(token))
{
entry.value.notify_one();
}
}
pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
self.fetch_cancels
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(handle_to_fetch_cancel_key(token))
.map(|e| e.value.clone())
}
pub fn remove_fetch_token(&self, token: u64) {
self.fetch_cancels
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(handle_to_fetch_cancel_key(token));
}
pub fn sweep(&self, ttl: Duration) {
Self::sweep_registry(&self.readers, ttl);
Self::sweep_registry(&self.writers, ttl);
Self::sweep_registry(&self.trailer_tx, ttl);
Self::sweep_registry(&self.trailer_rx, ttl);
Self::sweep_registry(&self.request_heads, ttl);
Self::sweep_registry(&self.sessions, ttl);
Self::sweep_registry(&self.fetch_cancels, ttl);
self.sweep_pending_readers(ttl);
self.sweep_pending_trailer_rxs(ttl);
}
fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
let expired: Vec<K> = reg
.iter()
.filter(|(_, e)| e.is_expired(ttl))
.map(|(k, _)| k)
.collect();
if expired.is_empty() {
return;
}
for key in &expired {
reg.remove(*key);
}
tracing::debug!(
"[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
expired.len()
);
if reg.is_empty() && reg.capacity() > 128 {
*reg = SlotMap::with_key();
}
}
fn sweep_pending_readers(&self, ttl: Duration) {
let mut map = self
.pending_readers
.lock()
.unwrap_or_else(|e| e.into_inner());
let before = map.len();
map.retain(|_, e| e.created.elapsed() < ttl);
let removed = before - map.len();
if removed > 0 {
tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
}
}
fn sweep_pending_trailer_rxs(&self, ttl: Duration) {
let mut map = self
.pending_trailer_rxs
.lock()
.unwrap_or_else(|e| e.into_inner());
let before = map.len();
map.retain(|_, e| e.created.elapsed() < ttl);
let removed = before - map.len();
if removed > 0 {
tracing::debug!(
"[iroh-http] swept {removed} stale pending trailer receivers (ttl={ttl:?})"
);
}
}
}
pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
pub(crate) async fn pump_quic_recv_to_body(
mut recv: iroh::endpoint::RecvStream,
writer: BodyWriter,
) {
while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
if writer.send_chunk(chunk.bytes).await.is_err() {
break;
}
}
}
pub(crate) async fn pump_body_to_quic_send(
reader: BodyReader,
mut send: iroh::endpoint::SendStream,
) {
loop {
match reader.next_chunk().await {
None => break,
Some(data) => {
if send.write_all(&data).await.is_err() {
break;
}
}
}
}
let _ = send.finish();
}
pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
where
IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let (mut recv, mut send) = tokio::io::split(io);
tokio::join!(
async {
use bytes::BytesMut;
use tokio::io::AsyncReadExt;
let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
loop {
buf.clear();
match recv.read_buf(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(_) => {
if writer
.send_chunk(buf.split().freeze())
.await
.is_err()
{
break;
}
}
}
}
},
async {
use tokio::io::AsyncWriteExt;
loop {
match reader.next_chunk().await {
None => break,
Some(data) => {
if send.write_all(&data).await.is_err() {
break;
}
}
}
}
let _ = send.shutdown().await;
},
);
}
#[cfg(test)]
mod tests {
use super::*;
fn test_store() -> HandleStore {
HandleStore::new(StoreConfig::default())
}
#[tokio::test]
async fn body_channel_send_recv() {
let (writer, reader) = make_body_channel();
writer.send_chunk(Bytes::from("hello")).await.unwrap();
drop(writer); let chunk = reader.next_chunk().await;
assert_eq!(chunk, Some(Bytes::from("hello")));
let eof = reader.next_chunk().await;
assert!(eof.is_none());
}
#[tokio::test]
async fn body_channel_multiple_chunks() {
let (writer, reader) = make_body_channel();
writer.send_chunk(Bytes::from("a")).await.unwrap();
writer.send_chunk(Bytes::from("b")).await.unwrap();
writer.send_chunk(Bytes::from("c")).await.unwrap();
drop(writer);
let mut collected = Vec::new();
while let Some(chunk) = reader.next_chunk().await {
collected.push(chunk);
}
assert_eq!(
collected,
vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
);
}
#[tokio::test]
async fn body_channel_reader_dropped_returns_error() {
let (writer, reader) = make_body_channel();
drop(reader);
let result = writer.send_chunk(Bytes::from("data")).await;
assert!(result.is_err());
}
#[tokio::test]
async fn insert_reader_and_next_chunk() {
let store = test_store();
let (writer, reader) = store.make_body_channel();
let handle = store.insert_reader(reader).unwrap();
writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
drop(writer);
let chunk = store.next_chunk(handle).await.unwrap();
assert_eq!(chunk, Some(Bytes::from("slab-data")));
let eof = store.next_chunk(handle).await.unwrap();
assert!(eof.is_none());
}
#[tokio::test]
async fn next_chunk_invalid_handle() {
let store = test_store();
let result = store.next_chunk(999999).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
}
#[tokio::test]
async fn send_chunk_via_handle() {
let store = test_store();
let (writer, reader) = store.make_body_channel();
let handle = store.insert_writer(writer).unwrap();
store
.send_chunk(handle, Bytes::from("via-slab"))
.await
.unwrap();
store.finish_body(handle).unwrap();
let chunk = reader.next_chunk().await;
assert_eq!(chunk, Some(Bytes::from("via-slab")));
let eof = reader.next_chunk().await;
assert!(eof.is_none());
}
#[tokio::test]
async fn capacity_cap_rejects_overflow() {
let store = HandleStore::new(StoreConfig {
max_handles: 2,
..StoreConfig::default()
});
let (_, r1) = store.make_body_channel();
let (_, r2) = store.make_body_channel();
let (_, r3) = store.make_body_channel();
store.insert_reader(r1).unwrap();
store.insert_reader(r2).unwrap();
let err = store.insert_reader(r3).unwrap_err();
assert!(err.message.contains("capacity"));
}
#[test]
fn sweep_removes_unclaimed_trailer_receivers() {
let store = test_store();
let _handle = store.alloc_trailer_sender().unwrap();
assert_eq!(
store.pending_trailer_rxs.lock().unwrap().len(),
1
);
store.sweep(Duration::ZERO);
assert_eq!(
store.pending_trailer_rxs.lock().unwrap().len(),
0,
"sweep() must remove unclaimed pending trailer receivers"
);
}
#[tokio::test]
async fn recv_with_cancel_returns_none_on_cancel() {
let (_tx, rx) = mpsc::channel::<Bytes>(4);
let rx = Arc::new(tokio::sync::Mutex::new(rx));
let cancel = Arc::new(tokio::sync::Notify::new());
cancel.notify_one();
let result = recv_with_cancel(rx, cancel).await;
assert!(result.is_none());
}
}