1use std::{
9 collections::HashMap,
10 sync::{Arc, Mutex},
11 time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
27
28pub struct SessionEntry {
31 pub conn: iroh::endpoint::Connection,
32}
33
34pub struct ResponseHeadEntry {
35 pub status: u16,
36 pub headers: Vec<(String, String)>,
37}
38
39slotmap::new_key_type! { pub(crate) struct ReaderKey; }
42slotmap::new_key_type! { pub(crate) struct WriterKey; }
43slotmap::new_key_type! { pub(crate) struct TrailerTxKey; }
44slotmap::new_key_type! { pub(crate) struct TrailerRxKey; }
45slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
46slotmap::new_key_type! { pub(crate) struct SessionKey; }
47slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
48
49fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
52 k.data().as_ffi()
53}
54
55macro_rules! handle_to_key {
56 ($fn_name:ident, $key_type:ty) => {
57 fn $fn_name(h: u64) -> $key_type {
58 <$key_type>::from(KeyData::from_ffi(h))
59 }
60 };
61}
62
63handle_to_key!(handle_to_reader_key, ReaderKey);
64handle_to_key!(handle_to_writer_key, WriterKey);
65handle_to_key!(handle_to_trailer_tx_key, TrailerTxKey);
66handle_to_key!(handle_to_trailer_rx_key, TrailerRxKey);
67handle_to_key!(handle_to_session_key, SessionKey);
68handle_to_key!(handle_to_request_head_key, RequestHeadKey);
69handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
70
71pub struct BodyReader {
77 pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
78 pub(crate) cancel: Arc<tokio::sync::Notify>,
81 pub(crate) drain_timeout: Duration,
83}
84
85pub struct BodyWriter {
88 pub(crate) tx: mpsc::Sender<Bytes>,
89 pub(crate) drain_timeout: Duration,
91}
92
93pub fn make_body_channel() -> (BodyWriter, BodyReader) {
100 make_body_channel_with(
101 DEFAULT_CHANNEL_CAPACITY,
102 Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
103 )
104}
105
106fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
107 let (tx, rx) = mpsc::channel(capacity);
108 (
109 BodyWriter { tx, drain_timeout },
110 BodyReader {
111 rx: Arc::new(tokio::sync::Mutex::new(rx)),
112 cancel: Arc::new(tokio::sync::Notify::new()),
113 drain_timeout,
114 },
115 )
116}
117
118async fn recv_with_cancel(
127 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
128 cancel: Arc<tokio::sync::Notify>,
129) -> Option<Bytes> {
130 tokio::select! {
131 biased;
132 _ = cancel.notified() => None,
133 chunk = async { rx.lock().await.recv().await } => chunk,
134 }
135}
136
137impl BodyReader {
138 pub async fn next_chunk(&self) -> Option<Bytes> {
141 recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
142 }
143}
144
145impl BodyWriter {
146 pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
149 tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
150 .await
151 .map_err(|_| "drain timeout: body reader is too slow".to_string())?
152 .map_err(|_| "body reader dropped".to_string())
153 }
154}
155
156type TrailerTx = tokio::sync::oneshot::Sender<Vec<(String, String)>>;
159pub(crate) type TrailerRx = tokio::sync::oneshot::Receiver<Vec<(String, String)>>;
160
161#[derive(Debug, Clone)]
165pub struct StoreConfig {
166 pub channel_capacity: usize,
168 pub max_chunk_size: usize,
170 pub drain_timeout: Duration,
172 pub max_handles: usize,
174 pub ttl: Duration,
177}
178
179impl Default for StoreConfig {
180 fn default() -> Self {
181 Self {
182 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
183 max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
184 drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
185 max_handles: DEFAULT_MAX_HANDLES,
186 ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
187 }
188 }
189}
190
191struct Timed<T> {
194 value: T,
195 created_at: Instant,
196}
197
198impl<T> Timed<T> {
199 fn new(value: T) -> Self {
200 Self {
201 value,
202 created_at: Instant::now(),
203 }
204 }
205
206 fn is_expired(&self, ttl: Duration) -> bool {
207 self.created_at.elapsed() > ttl
208 }
209}
210
211struct PendingReaderEntry {
213 reader: BodyReader,
214 created: Instant,
215}
216
217struct PendingTrailerRxEntry {
219 rx: TrailerRx,
220 created: Instant,
221}
222
223pub(crate) struct InsertGuard<'a> {
229 store: &'a HandleStore,
230 tracked: Vec<TrackedHandle>,
231 committed: bool,
232}
233
234enum TrackedHandle {
249 Reader(u64),
250 Writer(u64),
251 TrailerTx(u64),
252 TrailerRx(u64),
253 ReqHead(u64),
254}
255
256impl<'a> InsertGuard<'a> {
257 fn new(store: &'a HandleStore) -> Self {
258 Self {
259 store,
260 tracked: Vec::new(),
261 committed: false,
262 }
263 }
264
265 pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
266 let h = self.store.insert_reader(reader)?;
267 self.tracked.push(TrackedHandle::Reader(h));
268 Ok(h)
269 }
270
271 pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
272 let h = self.store.insert_writer(writer)?;
273 self.tracked.push(TrackedHandle::Writer(h));
274 Ok(h)
275 }
276
277 pub fn insert_trailer_sender(&mut self, tx: TrailerTx) -> Result<u64, CoreError> {
278 let h = self.store.insert_trailer_sender(tx)?;
279 self.tracked.push(TrackedHandle::TrailerTx(h));
280 Ok(h)
281 }
282
283 pub fn insert_trailer_receiver(&mut self, rx: TrailerRx) -> Result<u64, CoreError> {
284 let h = self.store.insert_trailer_receiver(rx)?;
285 self.tracked.push(TrackedHandle::TrailerRx(h));
286 Ok(h)
287 }
288
289 pub fn allocate_req_handle(
290 &mut self,
291 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
292 ) -> Result<u64, CoreError> {
293 let h = self.store.allocate_req_handle(sender)?;
294 self.tracked.push(TrackedHandle::ReqHead(h));
295 Ok(h)
296 }
297
298 pub fn commit(mut self) {
300 self.committed = true;
301 }
302}
303
304impl Drop for InsertGuard<'_> {
305 fn drop(&mut self) {
306 if self.committed {
307 return;
308 }
309 for handle in &self.tracked {
310 match handle {
311 TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
312 TrackedHandle::Writer(h) => {
313 let _ = self.store.finish_body(*h);
314 }
315 TrackedHandle::TrailerTx(h) => self.store.remove_trailer_sender(*h),
316 TrackedHandle::TrailerRx(h) => {
317 self.store
318 .trailer_rx
319 .lock()
320 .unwrap_or_else(|e| e.into_inner())
321 .remove(handle_to_trailer_rx_key(*h));
322 }
323 TrackedHandle::ReqHead(h) => {
324 self.store
325 .request_heads
326 .lock()
327 .unwrap_or_else(|e| e.into_inner())
328 .remove(handle_to_request_head_key(*h));
329 }
330 }
331 }
332 }
333}
334
335pub struct HandleStore {
342 readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
343 writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
344 trailer_tx: Mutex<SlotMap<TrailerTxKey, Timed<TrailerTx>>>,
345 trailer_rx: Mutex<SlotMap<TrailerRxKey, Timed<TrailerRx>>>,
346 sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
347 request_heads:
348 Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
349 fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
350 pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
351 pending_trailer_rxs: Mutex<HashMap<u64, PendingTrailerRxEntry>>,
355 pub(crate) config: StoreConfig,
356}
357
358impl HandleStore {
359 pub fn new(config: StoreConfig) -> Self {
361 Self {
362 readers: Mutex::new(SlotMap::with_key()),
363 writers: Mutex::new(SlotMap::with_key()),
364 trailer_tx: Mutex::new(SlotMap::with_key()),
365 trailer_rx: Mutex::new(SlotMap::with_key()),
366 sessions: Mutex::new(SlotMap::with_key()),
367 request_heads: Mutex::new(SlotMap::with_key()),
368 fetch_cancels: Mutex::new(SlotMap::with_key()),
369 pending_readers: Mutex::new(HashMap::new()),
370 pending_trailer_rxs: Mutex::new(HashMap::new()),
371 config,
372 }
373 }
374
375 pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
379 InsertGuard::new(self)
380 }
381
382 pub fn drain_timeout(&self) -> Duration {
384 self.config.drain_timeout
385 }
386
387 pub fn max_chunk_size(&self) -> usize {
389 self.config.max_chunk_size
390 }
391
392 pub fn count_handles(&self) -> (usize, usize, usize, usize) {
396 let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
397 let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
398 let sessions = self
399 .sessions
400 .lock()
401 .unwrap_or_else(|e| e.into_inner())
402 .len();
403 let total = readers
404 .saturating_add(writers)
405 .saturating_add(sessions)
406 .saturating_add(
407 self.trailer_tx
408 .lock()
409 .unwrap_or_else(|e| e.into_inner())
410 .len(),
411 )
412 .saturating_add(
413 self.trailer_rx
414 .lock()
415 .unwrap_or_else(|e| e.into_inner())
416 .len(),
417 )
418 .saturating_add(
419 self.request_heads
420 .lock()
421 .unwrap_or_else(|e| e.into_inner())
422 .len(),
423 )
424 .saturating_add(
425 self.fetch_cancels
426 .lock()
427 .unwrap_or_else(|e| e.into_inner())
428 .len(),
429 );
430 (readers, writers, sessions, total)
431 }
432
433 pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
437 make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
438 }
439
440 fn insert_checked<K: slotmap::Key, T>(
443 registry: &Mutex<SlotMap<K, Timed<T>>>,
444 value: T,
445 max: usize,
446 ) -> Result<u64, CoreError> {
447 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
448 if reg.len() >= max {
449 return Err(CoreError::internal("handle registry at capacity"));
450 }
451 let key = reg.insert(Timed::new(value));
452 Ok(key_to_handle(key))
453 }
454
455 pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
459 Self::insert_checked(&self.readers, reader, self.config.max_handles)
460 }
461
462 pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
464 Self::insert_checked(&self.writers, writer, self.config.max_handles)
465 }
466
467 pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
473 let (writer, reader) = self.make_body_channel();
474 let handle = self.insert_writer(writer)?;
475 Ok((handle, reader))
476 }
477
478 pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
481 self.pending_readers
482 .lock()
483 .unwrap_or_else(|e| e.into_inner())
484 .insert(
485 writer_handle,
486 PendingReaderEntry {
487 reader,
488 created: Instant::now(),
489 },
490 );
491 }
492
493 pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
496 self.pending_readers
497 .lock()
498 .unwrap_or_else(|e| e.into_inner())
499 .remove(&writer_handle)
500 .map(|e| e.reader)
501 }
502
503 pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
510 let (rx_arc, cancel) = {
512 let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
513 let entry = reg
514 .get(handle_to_reader_key(handle))
515 .ok_or_else(|| CoreError::invalid_handle(handle))?;
516 (entry.value.rx.clone(), entry.value.cancel.clone())
517 };
518
519 let chunk = recv_with_cancel(rx_arc, cancel).await;
520
521 if chunk.is_none() {
523 self.readers
524 .lock()
525 .unwrap_or_else(|e| e.into_inner())
526 .remove(handle_to_reader_key(handle));
527 }
528
529 Ok(chunk)
530 }
531
532 pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
537 let (tx, timeout) = {
539 let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
540 let entry = reg
541 .get(handle_to_writer_key(handle))
542 .ok_or_else(|| CoreError::invalid_handle(handle))?;
543 (entry.value.tx.clone(), entry.value.drain_timeout)
544 };
545 let max = self.config.max_chunk_size;
546 if chunk.len() <= max {
547 tokio::time::timeout(timeout, tx.send(chunk))
548 .await
549 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
550 .map_err(|_| CoreError::internal("body reader dropped"))
551 } else {
552 let mut offset = 0;
554 while offset < chunk.len() {
555 let end = offset.saturating_add(max).min(chunk.len());
556 tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
557 .await
558 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
559 .map_err(|_| CoreError::internal("body reader dropped"))?;
560 offset = end;
561 }
562 Ok(())
563 }
564 }
565
566 pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
568 self.writers
569 .lock()
570 .unwrap_or_else(|e| e.into_inner())
571 .remove(handle_to_writer_key(handle))
572 .ok_or_else(|| CoreError::invalid_handle(handle))?;
573 Ok(())
574 }
575
576 pub fn cancel_reader(&self, handle: u64) {
578 let entry = self
579 .readers
580 .lock()
581 .unwrap_or_else(|e| e.into_inner())
582 .remove(handle_to_reader_key(handle));
583 if let Some(e) = entry {
584 e.value.cancel.notify_waiters();
585 }
586 }
587
588 pub fn insert_trailer_sender(&self, tx: TrailerTx) -> Result<u64, CoreError> {
592 Self::insert_checked(&self.trailer_tx, tx, self.config.max_handles)
593 }
594
595 pub fn insert_trailer_receiver(&self, rx: TrailerRx) -> Result<u64, CoreError> {
597 Self::insert_checked(&self.trailer_rx, rx, self.config.max_handles)
598 }
599
600 pub fn remove_trailer_sender(&self, handle: u64) {
602 self.trailer_tx
603 .lock()
604 .unwrap_or_else(|e| e.into_inner())
605 .remove(handle_to_trailer_tx_key(handle));
606 }
607
608 pub fn alloc_trailer_sender(&self) -> Result<u64, CoreError> {
616 let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
617 let handle = self.insert_trailer_sender(tx)?;
618 self.pending_trailer_rxs
619 .lock()
620 .unwrap_or_else(|e| e.into_inner())
621 .insert(
622 handle,
623 PendingTrailerRxEntry {
624 rx,
625 created: Instant::now(),
626 },
627 );
628 Ok(handle)
629 }
630
631 pub fn claim_pending_trailer_rx(&self, sender_handle: u64) -> Option<TrailerRx> {
636 self.pending_trailer_rxs
637 .lock()
638 .unwrap_or_else(|e| e.into_inner())
639 .remove(&sender_handle)
640 .map(|e| e.rx)
641 }
642
643 pub fn send_trailers(
645 &self,
646 handle: u64,
647 trailers: Vec<(String, String)>,
648 ) -> Result<(), CoreError> {
649 let tx = self
650 .trailer_tx
651 .lock()
652 .unwrap_or_else(|e| e.into_inner())
653 .remove(handle_to_trailer_tx_key(handle))
654 .ok_or_else(|| CoreError::invalid_handle(handle))?
655 .value;
656 tx.send(trailers)
657 .map_err(|_| CoreError::internal("trailer receiver dropped"))
658 }
659
660 pub async fn next_trailer(
662 &self,
663 handle: u64,
664 ) -> Result<Option<Vec<(String, String)>>, CoreError> {
665 let rx = self
666 .trailer_rx
667 .lock()
668 .unwrap_or_else(|e| e.into_inner())
669 .remove(handle_to_trailer_rx_key(handle))
670 .ok_or_else(|| CoreError::invalid_handle(handle))?
671 .value;
672 match rx.await {
673 Ok(trailers) => Ok(Some(trailers)),
674 Err(_) => Ok(None), }
676 }
677
678 pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
682 Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
683 }
684
685 pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
687 self.sessions
688 .lock()
689 .unwrap_or_else(|e| e.into_inner())
690 .get(handle_to_session_key(handle))
691 .map(|e| e.value.clone())
692 }
693
694 pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
696 self.sessions
697 .lock()
698 .unwrap_or_else(|e| e.into_inner())
699 .remove(handle_to_session_key(handle))
700 .map(|e| e.value)
701 }
702
703 pub fn allocate_req_handle(
707 &self,
708 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
709 ) -> Result<u64, CoreError> {
710 Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
711 }
712
713 pub fn take_req_sender(
715 &self,
716 handle: u64,
717 ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
718 self.request_heads
719 .lock()
720 .unwrap_or_else(|e| e.into_inner())
721 .remove(handle_to_request_head_key(handle))
722 .map(|e| e.value)
723 }
724
725 pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
729 let notify = Arc::new(tokio::sync::Notify::new());
730 Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
731 }
732
733 pub fn cancel_in_flight(&self, token: u64) {
735 if let Some(entry) = self
736 .fetch_cancels
737 .lock()
738 .unwrap_or_else(|e| e.into_inner())
739 .get(handle_to_fetch_cancel_key(token))
740 {
741 entry.value.notify_one();
742 }
743 }
744
745 pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
747 self.fetch_cancels
748 .lock()
749 .unwrap_or_else(|e| e.into_inner())
750 .get(handle_to_fetch_cancel_key(token))
751 .map(|e| e.value.clone())
752 }
753
754 pub fn remove_fetch_token(&self, token: u64) {
756 self.fetch_cancels
757 .lock()
758 .unwrap_or_else(|e| e.into_inner())
759 .remove(handle_to_fetch_cancel_key(token));
760 }
761
762 pub fn sweep(&self, ttl: Duration) {
768 Self::sweep_registry(&self.readers, ttl);
769 Self::sweep_registry(&self.writers, ttl);
770 Self::sweep_registry(&self.trailer_tx, ttl);
771 Self::sweep_registry(&self.trailer_rx, ttl);
772 Self::sweep_registry(&self.request_heads, ttl);
773 Self::sweep_registry(&self.sessions, ttl);
774 Self::sweep_registry(&self.fetch_cancels, ttl);
775 self.sweep_pending_readers(ttl);
776 self.sweep_pending_trailer_rxs(ttl);
777 }
778
779 fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
780 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
781 let expired: Vec<K> = reg
782 .iter()
783 .filter(|(_, e)| e.is_expired(ttl))
784 .map(|(k, _)| k)
785 .collect();
786
787 if expired.is_empty() {
788 return;
789 }
790
791 for key in &expired {
792 reg.remove(*key);
793 }
794 tracing::debug!(
795 "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
796 expired.len()
797 );
798 if reg.is_empty() && reg.capacity() > 128 {
800 *reg = SlotMap::with_key();
801 }
802 }
803
804 fn sweep_pending_readers(&self, ttl: Duration) {
805 let mut map = self
806 .pending_readers
807 .lock()
808 .unwrap_or_else(|e| e.into_inner());
809 let before = map.len();
810 map.retain(|_, e| e.created.elapsed() < ttl);
811 let removed = before.saturating_sub(map.len());
812 if removed > 0 {
813 tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
814 }
815 }
816
817 fn sweep_pending_trailer_rxs(&self, ttl: Duration) {
818 let mut map = self
819 .pending_trailer_rxs
820 .lock()
821 .unwrap_or_else(|e| e.into_inner());
822 let before = map.len();
823 map.retain(|_, e| e.created.elapsed() < ttl);
824 let removed = before.saturating_sub(map.len());
825 if removed > 0 {
826 tracing::debug!(
827 "[iroh-http] swept {removed} stale pending trailer receivers (ttl={ttl:?})"
828 );
829 }
830 }
831}
832
833pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
837
838pub(crate) async fn pump_quic_recv_to_body(
843 mut recv: iroh::endpoint::RecvStream,
844 writer: BodyWriter,
845) {
846 while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
847 if writer.send_chunk(chunk.bytes).await.is_err() {
848 break;
849 }
850 }
851 }
853
854pub(crate) async fn pump_body_to_quic_send(
859 reader: BodyReader,
860 mut send: iroh::endpoint::SendStream,
861) {
862 loop {
863 match reader.next_chunk().await {
864 None => break,
865 Some(data) => {
866 if send.write_all(&data).await.is_err() {
867 break;
868 }
869 }
870 }
871 }
872 let _ = send.finish();
873}
874
875pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
882where
883 IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
884{
885 let (mut recv, mut send) = tokio::io::split(io);
886
887 tokio::join!(
888 async {
889 use bytes::BytesMut;
890 use tokio::io::AsyncReadExt;
891 let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
892 loop {
893 buf.clear();
894 match recv.read_buf(&mut buf).await {
895 Ok(0) | Err(_) => break,
896 Ok(_) => {
897 if writer.send_chunk(buf.split().freeze()).await.is_err() {
898 break;
899 }
900 }
901 }
902 }
903 },
904 async {
905 use tokio::io::AsyncWriteExt;
906 loop {
907 match reader.next_chunk().await {
908 None => break,
909 Some(data) => {
910 if send.write_all(&data).await.is_err() {
911 break;
912 }
913 }
914 }
915 }
916 let _ = send.shutdown().await;
917 },
918 );
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924
925 fn test_store() -> HandleStore {
926 HandleStore::new(StoreConfig::default())
927 }
928
929 #[tokio::test]
932 async fn body_channel_send_recv() {
933 let (writer, reader) = make_body_channel();
934 writer.send_chunk(Bytes::from("hello")).await.unwrap();
935 drop(writer); let chunk = reader.next_chunk().await;
937 assert_eq!(chunk, Some(Bytes::from("hello")));
938 let eof = reader.next_chunk().await;
939 assert!(eof.is_none());
940 }
941
942 #[tokio::test]
943 async fn body_channel_multiple_chunks() {
944 let (writer, reader) = make_body_channel();
945 writer.send_chunk(Bytes::from("a")).await.unwrap();
946 writer.send_chunk(Bytes::from("b")).await.unwrap();
947 writer.send_chunk(Bytes::from("c")).await.unwrap();
948 drop(writer);
949
950 let mut collected = Vec::new();
951 while let Some(chunk) = reader.next_chunk().await {
952 collected.push(chunk);
953 }
954 assert_eq!(
955 collected,
956 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
957 );
958 }
959
960 #[tokio::test]
961 async fn body_channel_reader_dropped_returns_error() {
962 let (writer, reader) = make_body_channel();
963 drop(reader);
964 let result = writer.send_chunk(Bytes::from("data")).await;
965 assert!(result.is_err());
966 }
967
968 #[tokio::test]
971 async fn insert_reader_and_next_chunk() {
972 let store = test_store();
973 let (writer, reader) = store.make_body_channel();
974 let handle = store.insert_reader(reader).unwrap();
975
976 writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
977 drop(writer);
978
979 let chunk = store.next_chunk(handle).await.unwrap();
980 assert_eq!(chunk, Some(Bytes::from("slab-data")));
981
982 let eof = store.next_chunk(handle).await.unwrap();
984 assert!(eof.is_none());
985 }
986
987 #[tokio::test]
988 async fn next_chunk_invalid_handle() {
989 let store = test_store();
990 let result = store.next_chunk(999999).await;
991 assert!(result.is_err());
992 assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
993 }
994
995 #[tokio::test]
996 async fn send_chunk_via_handle() {
997 let store = test_store();
998 let (writer, reader) = store.make_body_channel();
999 let handle = store.insert_writer(writer).unwrap();
1000
1001 store
1002 .send_chunk(handle, Bytes::from("via-slab"))
1003 .await
1004 .unwrap();
1005 store.finish_body(handle).unwrap();
1006
1007 let chunk = reader.next_chunk().await;
1008 assert_eq!(chunk, Some(Bytes::from("via-slab")));
1009 let eof = reader.next_chunk().await;
1010 assert!(eof.is_none());
1011 }
1012
1013 #[tokio::test]
1014 async fn capacity_cap_rejects_overflow() {
1015 let store = HandleStore::new(StoreConfig {
1016 max_handles: 2,
1017 ..StoreConfig::default()
1018 });
1019 let (_, r1) = store.make_body_channel();
1020 let (_, r2) = store.make_body_channel();
1021 let (_, r3) = store.make_body_channel();
1022 store.insert_reader(r1).unwrap();
1023 store.insert_reader(r2).unwrap();
1024 let err = store.insert_reader(r3).unwrap_err();
1025 assert!(err.message.contains("capacity"));
1026 }
1027
1028 #[test]
1031 fn sweep_removes_unclaimed_trailer_receivers() {
1032 let store = test_store();
1033 let _handle = store.alloc_trailer_sender().unwrap();
1035 assert_eq!(store.pending_trailer_rxs.lock().unwrap().len(), 1);
1037 store.sweep(Duration::ZERO);
1039 assert_eq!(
1040 store.pending_trailer_rxs.lock().unwrap().len(),
1041 0,
1042 "sweep() must remove unclaimed pending trailer receivers"
1043 );
1044 }
1045
1046 #[tokio::test]
1049 async fn recv_with_cancel_returns_none_on_cancel() {
1050 let (_tx, rx) = mpsc::channel::<Bytes>(4);
1051 let rx = Arc::new(tokio::sync::Mutex::new(rx));
1052 let cancel = Arc::new(tokio::sync::Notify::new());
1053 cancel.notify_one();
1055 let result = recv_with_cancel(rx, cancel).await;
1056 assert!(result.is_none());
1057 }
1058}