1use std::{
9 collections::HashMap,
10 sync::{Arc, Mutex},
11 time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29pub struct SessionEntry {
32 pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36 pub status: u16,
37 pub headers: Vec<(String, String)>,
38}
39
40slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51 k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55 ($fn_name:ident, $key_type:ty) => {
56 fn $fn_name(h: u64) -> $key_type {
57 <$key_type>::from(KeyData::from_ffi(h))
58 }
59 };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68pub struct BodyReader {
74 pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75 pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80pub struct BodyWriter {
83 pub(crate) tx: mpsc::Sender<Bytes>,
84 pub(crate) drain_timeout: Duration,
86}
87
88pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95 make_body_channel_with(
96 DEFAULT_CHANNEL_CAPACITY,
97 Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98 )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102 let (tx, rx) = mpsc::channel(capacity);
103 (
104 BodyWriter { tx, drain_timeout },
105 BodyReader {
106 rx: Arc::new(tokio::sync::Mutex::new(rx)),
107 cancel: Arc::new(tokio::sync::Notify::new()),
108 },
109 )
110}
111
112async fn recv_with_cancel(
121 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122 cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124 tokio::select! {
125 biased;
126 _ = cancel.notified() => None,
127 chunk = async { rx.lock().await.recv().await } => chunk,
128 }
129}
130
131impl BodyReader {
132 pub async fn next_chunk(&self) -> Option<Bytes> {
135 recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136 }
137}
138
139impl BodyWriter {
140 pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143 tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144 .await
145 .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146 .map_err(|_| "body reader dropped".to_string())
147 }
148}
149
150#[derive(Debug, Clone)]
154pub struct StoreConfig {
155 pub channel_capacity: usize,
157 pub max_chunk_size: usize,
159 pub drain_timeout: Duration,
161 pub max_handles: usize,
163 pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169 fn default() -> Self {
170 Self {
171 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172 max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173 drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174 max_handles: DEFAULT_MAX_HANDLES,
175 ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176 }
177 }
178}
179
180struct Timed<T> {
183 value: T,
184 last_accessed: Instant,
187}
188
189impl<T> Timed<T> {
190 fn new(value: T) -> Self {
191 Self {
192 value,
193 last_accessed: Instant::now(),
194 }
195 }
196
197 fn touch(&mut self) {
199 self.last_accessed = Instant::now();
200 }
201
202 fn is_expired(&self, ttl: Duration) -> bool {
203 self.last_accessed.elapsed() > ttl
204 }
205}
206
207struct PendingReaderEntry {
209 reader: BodyReader,
210 created: Instant,
211}
212
213pub(crate) struct InsertGuard<'a> {
219 store: &'a HandleStore,
220 tracked: Vec<TrackedHandle>,
221 committed: bool,
222}
223
224enum TrackedHandle {
239 Reader(u64),
240 Writer(u64),
241 ReqHead(u64),
242}
243
244impl<'a> InsertGuard<'a> {
245 fn new(store: &'a HandleStore) -> Self {
246 Self {
247 store,
248 tracked: Vec::new(),
249 committed: false,
250 }
251 }
252
253 pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
254 let h = self.store.insert_reader(reader)?;
255 self.tracked.push(TrackedHandle::Reader(h));
256 Ok(h)
257 }
258
259 pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
260 let h = self.store.insert_writer(writer)?;
261 self.tracked.push(TrackedHandle::Writer(h));
262 Ok(h)
263 }
264
265 pub fn allocate_req_handle(
266 &mut self,
267 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
268 ) -> Result<u64, CoreError> {
269 let h = self.store.allocate_req_handle(sender)?;
270 self.tracked.push(TrackedHandle::ReqHead(h));
271 Ok(h)
272 }
273
274 pub fn commit(mut self) {
276 self.committed = true;
277 }
278}
279
280impl Drop for InsertGuard<'_> {
281 fn drop(&mut self) {
282 if self.committed {
283 return;
284 }
285 for handle in &self.tracked {
286 match handle {
287 TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
288 TrackedHandle::Writer(h) => {
289 let _ = self.store.finish_body(*h);
290 }
291 TrackedHandle::ReqHead(h) => {
292 self.store
293 .request_heads
294 .lock()
295 .unwrap_or_else(|e| e.into_inner())
296 .remove(handle_to_request_head_key(*h));
297 }
298 }
299 }
300 }
301}
302
303pub struct HandleStore {
310 readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
311 writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
312 sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
313 request_heads:
314 Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
315 fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
316 pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
317 pub(crate) config: StoreConfig,
318}
319
320impl HandleStore {
321 pub fn new(config: StoreConfig) -> Self {
323 Self {
324 readers: Mutex::new(SlotMap::with_key()),
325 writers: Mutex::new(SlotMap::with_key()),
326 sessions: Mutex::new(SlotMap::with_key()),
327 request_heads: Mutex::new(SlotMap::with_key()),
328 fetch_cancels: Mutex::new(SlotMap::with_key()),
329 pending_readers: Mutex::new(HashMap::new()),
330 config,
331 }
332 }
333
334 pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
338 InsertGuard::new(self)
339 }
340
341 pub fn drain_timeout(&self) -> Duration {
343 self.config.drain_timeout
344 }
345
346 pub fn max_chunk_size(&self) -> usize {
348 self.config.max_chunk_size
349 }
350
351 pub fn count_handles(&self) -> (usize, usize, usize, usize) {
355 let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
356 let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
357 let sessions = self
358 .sessions
359 .lock()
360 .unwrap_or_else(|e| e.into_inner())
361 .len();
362 let total = readers
363 .saturating_add(writers)
364 .saturating_add(sessions)
365 .saturating_add(
366 self.request_heads
367 .lock()
368 .unwrap_or_else(|e| e.into_inner())
369 .len(),
370 )
371 .saturating_add(
372 self.fetch_cancels
373 .lock()
374 .unwrap_or_else(|e| e.into_inner())
375 .len(),
376 );
377 (readers, writers, sessions, total)
378 }
379
380 pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
384 make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
385 }
386
387 fn insert_checked<K: slotmap::Key, T>(
390 registry: &Mutex<SlotMap<K, Timed<T>>>,
391 value: T,
392 max: usize,
393 ) -> Result<u64, CoreError> {
394 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
395 if reg.len() >= max {
396 return Err(CoreError::internal("handle registry at capacity"));
397 }
398 let key = reg.insert(Timed::new(value));
399 Ok(key_to_handle(key))
400 }
401
402 pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
406 Self::insert_checked(&self.readers, reader, self.config.max_handles)
407 }
408
409 pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
411 Self::insert_checked(&self.writers, writer, self.config.max_handles)
412 }
413
414 pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
420 let (writer, reader) = self.make_body_channel();
421 let handle = self.insert_writer(writer)?;
422 Ok((handle, reader))
423 }
424
425 pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
428 self.pending_readers
429 .lock()
430 .unwrap_or_else(|e| e.into_inner())
431 .insert(
432 writer_handle,
433 PendingReaderEntry {
434 reader,
435 created: Instant::now(),
436 },
437 );
438 }
439
440 pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
443 self.pending_readers
444 .lock()
445 .unwrap_or_else(|e| e.into_inner())
446 .remove(&writer_handle)
447 .map(|e| e.reader)
448 }
449
450 pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
457 let (rx_arc, cancel) = {
459 let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
460 let entry = reg
461 .get_mut(handle_to_reader_key(handle))
462 .ok_or_else(|| CoreError::invalid_handle(handle))?;
463 entry.touch();
464 (entry.value.rx.clone(), entry.value.cancel.clone())
465 };
466
467 let chunk = recv_with_cancel(rx_arc, cancel).await;
468
469 if chunk.is_none() {
471 self.readers
472 .lock()
473 .unwrap_or_else(|e| e.into_inner())
474 .remove(handle_to_reader_key(handle));
475 }
476
477 Ok(chunk)
478 }
479
480 pub fn try_next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
492 let rx_arc = {
493 let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
494 let entry = reg
495 .get_mut(handle_to_reader_key(handle))
496 .ok_or_else(|| CoreError::invalid_handle(handle))?;
497 entry.touch();
498 entry.value.rx.clone()
499 };
500
501 let mut rx_guard = match rx_arc.try_lock() {
503 Ok(g) => g,
504 Err(_) => return Err(CoreError::internal("try_next_chunk: lock contended")),
505 };
506
507 match rx_guard.try_recv() {
508 Ok(chunk) => Ok(Some(chunk)),
509 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
510 Err(CoreError::internal("try_next_chunk: channel empty"))
511 }
512 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
513 drop(rx_guard);
515 self.readers
516 .lock()
517 .unwrap_or_else(|e| e.into_inner())
518 .remove(handle_to_reader_key(handle));
519 Ok(None)
520 }
521 }
522 }
523
524 pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
529 let (tx, timeout) = {
531 let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
532 let entry = reg
533 .get_mut(handle_to_writer_key(handle))
534 .ok_or_else(|| CoreError::invalid_handle(handle))?;
535 entry.touch();
536 (entry.value.tx.clone(), entry.value.drain_timeout)
537 };
538 let max = self.config.max_chunk_size;
539 if chunk.len() <= max {
540 tokio::time::timeout(timeout, tx.send(chunk))
541 .await
542 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
543 .map_err(|_| CoreError::internal("body reader dropped"))
544 } else {
545 let mut offset = 0;
547 while offset < chunk.len() {
548 let end = offset.saturating_add(max).min(chunk.len());
549 tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
550 .await
551 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
552 .map_err(|_| CoreError::internal("body reader dropped"))?;
553 offset = end;
554 }
555 Ok(())
556 }
557 }
558
559 pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
561 self.writers
562 .lock()
563 .unwrap_or_else(|e| e.into_inner())
564 .remove(handle_to_writer_key(handle))
565 .ok_or_else(|| CoreError::invalid_handle(handle))?;
566 Ok(())
567 }
568
569 pub fn cancel_reader(&self, handle: u64) {
571 let entry = self
572 .readers
573 .lock()
574 .unwrap_or_else(|e| e.into_inner())
575 .remove(handle_to_reader_key(handle));
576 if let Some(e) = entry {
577 e.value.cancel.notify_waiters();
578 }
579 }
580
581 pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
585 Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
586 }
587
588 pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
590 self.sessions
591 .lock()
592 .unwrap_or_else(|e| e.into_inner())
593 .get(handle_to_session_key(handle))
594 .map(|e| e.value.clone())
595 }
596
597 pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
599 self.sessions
600 .lock()
601 .unwrap_or_else(|e| e.into_inner())
602 .remove(handle_to_session_key(handle))
603 .map(|e| e.value)
604 }
605
606 pub fn allocate_req_handle(
610 &self,
611 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
612 ) -> Result<u64, CoreError> {
613 Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
614 }
615
616 pub fn take_req_sender(
618 &self,
619 handle: u64,
620 ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
621 self.request_heads
622 .lock()
623 .unwrap_or_else(|e| e.into_inner())
624 .remove(handle_to_request_head_key(handle))
625 .map(|e| e.value)
626 }
627
628 pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
632 let notify = Arc::new(tokio::sync::Notify::new());
633 Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
634 }
635
636 pub fn cancel_in_flight(&self, token: u64) {
638 if let Some(entry) = self
639 .fetch_cancels
640 .lock()
641 .unwrap_or_else(|e| e.into_inner())
642 .get(handle_to_fetch_cancel_key(token))
643 {
644 entry.value.notify_one();
645 }
646 }
647
648 pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
650 self.fetch_cancels
651 .lock()
652 .unwrap_or_else(|e| e.into_inner())
653 .get(handle_to_fetch_cancel_key(token))
654 .map(|e| e.value.clone())
655 }
656
657 pub fn remove_fetch_token(&self, token: u64) {
659 self.fetch_cancels
660 .lock()
661 .unwrap_or_else(|e| e.into_inner())
662 .remove(handle_to_fetch_cancel_key(token));
663 }
664
665 pub fn sweep(&self, ttl: Duration) {
671 Self::sweep_readers(&self.readers, ttl);
672 Self::sweep_registry(&self.writers, ttl);
673 Self::sweep_registry(&self.request_heads, ttl);
674 Self::sweep_registry(&self.sessions, ttl);
675 Self::sweep_registry(&self.fetch_cancels, ttl);
676 self.sweep_pending_readers(ttl);
677 }
678
679 fn sweep_readers(registry: &Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>, ttl: Duration) {
682 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
683 let expired: Vec<ReaderKey> = reg
684 .iter()
685 .filter(|(_, e)| e.is_expired(ttl))
686 .map(|(k, _)| k)
687 .collect();
688
689 if expired.is_empty() {
690 return;
691 }
692
693 for key in &expired {
694 if let Some(entry) = reg.remove(*key) {
695 entry.value.cancel.notify_waiters();
696 }
697 }
698 tracing::debug!(
699 "[iroh-http] swept {} expired reader entries (ttl={ttl:?})",
700 expired.len()
701 );
702 if reg.is_empty() && reg.capacity() > 128 {
703 *reg = SlotMap::with_key();
704 }
705 }
706
707 fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
708 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
709 let expired: Vec<K> = reg
710 .iter()
711 .filter(|(_, e)| e.is_expired(ttl))
712 .map(|(k, _)| k)
713 .collect();
714
715 if expired.is_empty() {
716 return;
717 }
718
719 for key in &expired {
720 reg.remove(*key);
721 }
722 tracing::debug!(
723 "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
724 expired.len()
725 );
726 if reg.is_empty() && reg.capacity() > 128 {
728 *reg = SlotMap::with_key();
729 }
730 }
731
732 fn sweep_pending_readers(&self, ttl: Duration) {
733 let mut map = self
734 .pending_readers
735 .lock()
736 .unwrap_or_else(|e| e.into_inner());
737 let before = map.len();
738 map.retain(|_, e| e.created.elapsed() < ttl);
739 let removed = before.saturating_sub(map.len());
740 if removed > 0 {
741 tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
742 }
743 }
744}
745
746pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
750
751pub(crate) async fn pump_quic_recv_to_body(
756 mut recv: iroh::endpoint::RecvStream,
757 writer: BodyWriter,
758) {
759 while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
760 if writer.send_chunk(chunk.bytes).await.is_err() {
761 break;
762 }
763 }
764 }
766
767pub(crate) async fn pump_body_to_quic_send(
772 reader: BodyReader,
773 mut send: iroh::endpoint::SendStream,
774) {
775 loop {
776 match reader.next_chunk().await {
777 None => break,
778 Some(data) => {
779 if send.write_all(&data).await.is_err() {
780 break;
781 }
782 }
783 }
784 }
785 let _ = send.finish();
786}
787
788pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
795where
796 IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
797{
798 let (mut recv, mut send) = tokio::io::split(io);
799
800 tokio::join!(
801 async {
802 use bytes::BytesMut;
803 use tokio::io::AsyncReadExt;
804 let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
805 loop {
806 buf.clear();
807 match recv.read_buf(&mut buf).await {
808 Ok(0) | Err(_) => break,
809 Ok(_) => {
810 if writer.send_chunk(buf.split().freeze()).await.is_err() {
811 break;
812 }
813 }
814 }
815 }
816 },
817 async {
818 use tokio::io::AsyncWriteExt;
819 loop {
820 match reader.next_chunk().await {
821 None => break,
822 Some(data) => {
823 if send.write_all(&data).await.is_err() {
824 break;
825 }
826 }
827 }
828 }
829 let _ = send.shutdown().await;
830 },
831 );
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837
838 fn test_store() -> HandleStore {
839 HandleStore::new(StoreConfig::default())
840 }
841
842 #[tokio::test]
845 async fn body_channel_send_recv() {
846 let (writer, reader) = make_body_channel();
847 writer.send_chunk(Bytes::from("hello")).await.unwrap();
848 drop(writer); let chunk = reader.next_chunk().await;
850 assert_eq!(chunk, Some(Bytes::from("hello")));
851 let eof = reader.next_chunk().await;
852 assert!(eof.is_none());
853 }
854
855 #[tokio::test]
856 async fn body_channel_multiple_chunks() {
857 let (writer, reader) = make_body_channel();
858 writer.send_chunk(Bytes::from("a")).await.unwrap();
859 writer.send_chunk(Bytes::from("b")).await.unwrap();
860 writer.send_chunk(Bytes::from("c")).await.unwrap();
861 drop(writer);
862
863 let mut collected = Vec::new();
864 while let Some(chunk) = reader.next_chunk().await {
865 collected.push(chunk);
866 }
867 assert_eq!(
868 collected,
869 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
870 );
871 }
872
873 #[tokio::test]
874 async fn body_channel_reader_dropped_returns_error() {
875 let (writer, reader) = make_body_channel();
876 drop(reader);
877 let result = writer.send_chunk(Bytes::from("data")).await;
878 assert!(result.is_err());
879 }
880
881 #[tokio::test]
884 async fn insert_reader_and_next_chunk() {
885 let store = test_store();
886 let (writer, reader) = store.make_body_channel();
887 let handle = store.insert_reader(reader).unwrap();
888
889 writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
890 drop(writer);
891
892 let chunk = store.next_chunk(handle).await.unwrap();
893 assert_eq!(chunk, Some(Bytes::from("slab-data")));
894
895 let eof = store.next_chunk(handle).await.unwrap();
897 assert!(eof.is_none());
898 }
899
900 #[tokio::test]
901 async fn next_chunk_invalid_handle() {
902 let store = test_store();
903 let result = store.next_chunk(999999).await;
904 assert!(result.is_err());
905 assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
906 }
907
908 #[tokio::test]
909 async fn send_chunk_via_handle() {
910 let store = test_store();
911 let (writer, reader) = store.make_body_channel();
912 let handle = store.insert_writer(writer).unwrap();
913
914 store
915 .send_chunk(handle, Bytes::from("via-slab"))
916 .await
917 .unwrap();
918 store.finish_body(handle).unwrap();
919
920 let chunk = reader.next_chunk().await;
921 assert_eq!(chunk, Some(Bytes::from("via-slab")));
922 let eof = reader.next_chunk().await;
923 assert!(eof.is_none());
924 }
925
926 #[tokio::test]
927 async fn capacity_cap_rejects_overflow() {
928 let store = HandleStore::new(StoreConfig {
929 max_handles: 2,
930 ..StoreConfig::default()
931 });
932 let (_, r1) = store.make_body_channel();
933 let (_, r2) = store.make_body_channel();
934 let (_, r3) = store.make_body_channel();
935 store.insert_reader(r1).unwrap();
936 store.insert_reader(r2).unwrap();
937 let err = store.insert_reader(r3).unwrap_err();
938 assert!(err.message.contains("capacity"));
939 }
940
941 #[tokio::test]
944 async fn recv_with_cancel_returns_none_on_cancel() {
945 let (_tx, rx) = mpsc::channel::<Bytes>(4);
946 let rx = Arc::new(tokio::sync::Mutex::new(rx));
947 let cancel = Arc::new(tokio::sync::Notify::new());
948 cancel.notify_one();
950 let result = recv_with_cancel(rx, cancel).await;
951 assert!(result.is_none());
952 }
953}