1#![allow(clippy::disallowed_types)]
9
10use std::{
11 collections::HashMap,
12 future::Future,
13 pin::Pin,
14 sync::{Arc, Mutex},
15 task::{Context, Poll},
16 time::{Duration, Instant},
17};
18
19use bytes::Bytes;
20use futures::sink::Sink;
21use http_body::Frame;
22use slotmap::{KeyData, SlotMap};
23use tokio::sync::mpsc;
24
25use crate::{http::body::BoxError, CoreError};
26
27pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
30pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
35
36pub struct SessionEntry {
39 pub conn: iroh::endpoint::Connection,
40}
41
42pub struct ResponseHeadEntry {
43 pub status: u16,
44 pub headers: Vec<(String, String)>,
45}
46
47slotmap::new_key_type! { pub(crate) struct ReaderKey; }
50slotmap::new_key_type! { pub(crate) struct WriterKey; }
51slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
52slotmap::new_key_type! { pub(crate) struct SessionKey; }
53slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
54
55fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
58 k.data().as_ffi()
59}
60
61macro_rules! handle_to_key {
62 ($fn_name:ident, $key_type:ty) => {
63 fn $fn_name(h: u64) -> $key_type {
64 <$key_type>::from(KeyData::from_ffi(h))
65 }
66 };
67}
68
69handle_to_key!(handle_to_reader_key, ReaderKey);
70handle_to_key!(handle_to_writer_key, WriterKey);
71handle_to_key!(handle_to_session_key, SessionKey);
72handle_to_key!(handle_to_request_head_key, RequestHeadKey);
73handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
74
75pub struct BodyReader {
91 pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
92 pub(crate) cancel: Arc<tokio::sync::Notify>,
95 pending: Option<Pin<Box<dyn Future<Output = Option<Bytes>> + Send + Sync>>>,
101}
102
103pub struct BodyWriter {
119 pub(crate) tx: mpsc::Sender<Bytes>,
120 pub(crate) drain_timeout: Duration,
122 sending: Option<BodyWriterSendFuture>,
126}
127
128type BodyWriterSendFuture = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + Sync>>;
131
132pub fn make_body_channel() -> (BodyWriter, BodyReader) {
140 make_body_channel_with(
141 DEFAULT_CHANNEL_CAPACITY,
142 Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
143 )
144}
145
146fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
147 let (tx, rx) = mpsc::channel(capacity);
148 (
149 BodyWriter {
150 tx,
151 drain_timeout,
152 sending: None,
153 },
154 BodyReader {
155 rx: Arc::new(tokio::sync::Mutex::new(rx)),
156 cancel: Arc::new(tokio::sync::Notify::new()),
157 pending: None,
158 },
159 )
160}
161
162async fn recv_with_cancel(
171 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
172 cancel: Arc<tokio::sync::Notify>,
173) -> Option<Bytes> {
174 tokio::select! {
175 biased;
176 _ = cancel.notified() => None,
177 chunk = async { rx.lock().await.recv().await } => chunk,
178 }
179}
180
181impl BodyReader {
182 pub async fn next_chunk(&self) -> Option<Bytes> {
185 recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
186 }
187}
188
189impl http_body::Body for BodyReader {
192 type Data = Bytes;
193 type Error = std::convert::Infallible;
194
195 fn poll_frame(
196 self: Pin<&mut Self>,
197 cx: &mut Context<'_>,
198 ) -> Poll<Option<Result<Frame<Bytes>, Self::Error>>> {
199 let this = self.get_mut();
200 let fut = this.pending.get_or_insert_with(|| {
201 Box::pin(recv_with_cancel(this.rx.clone(), this.cancel.clone()))
202 });
203 match fut.as_mut().poll(cx) {
204 Poll::Pending => Poll::Pending,
205 Poll::Ready(opt) => {
206 this.pending = None;
207 Poll::Ready(opt.map(|data| Ok(Frame::data(data))))
208 }
209 }
210 }
211}
212
213impl BodyWriter {
214 pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
217 tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
218 .await
219 .map_err(|_| "drain timeout: body reader is too slow".to_string())?
220 .map_err(|_| "body reader dropped".to_string())
221 }
222}
223
224impl Sink<Bytes> for BodyWriter {
229 type Error = BoxError;
230
231 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
232 let this = self.get_mut();
233 match this.sending.as_mut() {
234 None => Poll::Ready(Ok(())),
235 Some(fut) => match fut.as_mut().poll(cx) {
236 Poll::Pending => Poll::Pending,
237 Poll::Ready(res) => {
238 this.sending = None;
239 Poll::Ready(res)
240 }
241 },
242 }
243 }
244
245 fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
246 debug_assert!(
250 self.sending.is_none(),
251 "Sink contract: poll_ready must return Ready(Ok) before start_send"
252 );
253 let tx = self.tx.clone();
254 let drain_timeout = self.drain_timeout;
255 self.get_mut().sending = Some(Box::pin(async move {
256 tokio::time::timeout(drain_timeout, tx.send(item))
257 .await
258 .map_err(|_| -> BoxError { "drain timeout: body reader is too slow".into() })?
259 .map_err(|_| -> BoxError { "body reader dropped".into() })
260 }));
261 Ok(())
262 }
263
264 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
265 self.poll_ready(cx)
267 }
268
269 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270 self.poll_flush(cx)
274 }
275}
276
277#[derive(Debug, Clone)]
281pub struct StoreConfig {
282 pub channel_capacity: usize,
284 pub max_chunk_size: usize,
286 pub drain_timeout: Duration,
288 pub max_handles: usize,
290 pub ttl: Duration,
293}
294
295impl Default for StoreConfig {
296 fn default() -> Self {
297 Self {
298 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
299 max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
300 drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
301 max_handles: DEFAULT_MAX_HANDLES,
302 ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
303 }
304 }
305}
306
307struct Timed<T> {
310 value: T,
311 last_accessed: Instant,
314}
315
316impl<T> Timed<T> {
317 fn new(value: T) -> Self {
318 Self {
319 value,
320 last_accessed: Instant::now(),
321 }
322 }
323
324 fn touch(&mut self) {
326 self.last_accessed = Instant::now();
327 }
328
329 fn is_expired(&self, ttl: Duration) -> bool {
330 self.last_accessed.elapsed() > ttl
331 }
332}
333
334struct PendingReaderEntry {
336 reader: BodyReader,
337 created: Instant,
338}
339
340pub(crate) struct InsertGuard<'a> {
346 store: &'a HandleStore,
347 tracked: Vec<TrackedHandle>,
348 committed: bool,
349}
350
351enum TrackedHandle {
366 Reader(u64),
367 Writer(u64),
368 ReqHead(u64),
369}
370
371impl<'a> InsertGuard<'a> {
372 fn new(store: &'a HandleStore) -> Self {
373 Self {
374 store,
375 tracked: Vec::new(),
376 committed: false,
377 }
378 }
379
380 pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
381 let h = self.store.insert_reader(reader)?;
382 self.tracked.push(TrackedHandle::Reader(h));
383 Ok(h)
384 }
385
386 pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
387 let h = self.store.insert_writer(writer)?;
388 self.tracked.push(TrackedHandle::Writer(h));
389 Ok(h)
390 }
391
392 pub fn allocate_req_handle(
393 &mut self,
394 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
395 ) -> Result<u64, CoreError> {
396 let h = self.store.allocate_req_handle(sender)?;
397 self.tracked.push(TrackedHandle::ReqHead(h));
398 Ok(h)
399 }
400
401 pub fn commit(mut self) {
403 self.committed = true;
404 }
405}
406
407impl Drop for InsertGuard<'_> {
408 fn drop(&mut self) {
409 if self.committed {
410 return;
411 }
412 for handle in &self.tracked {
413 match handle {
414 TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
415 TrackedHandle::Writer(h) => {
416 let _ = self.store.finish_body(*h);
417 }
418 TrackedHandle::ReqHead(h) => {
419 self.store
420 .request_heads
421 .lock()
422 .unwrap_or_else(|e| e.into_inner())
423 .remove(handle_to_request_head_key(*h));
424 }
425 }
426 }
427 }
428}
429
430pub struct HandleStore {
437 readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
438 writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
439 sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
440 request_heads:
441 Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
442 fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
443 pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
444 pub(crate) config: StoreConfig,
445}
446
447impl HandleStore {
448 pub fn new(config: StoreConfig) -> Self {
450 Self {
451 readers: Mutex::new(SlotMap::with_key()),
452 writers: Mutex::new(SlotMap::with_key()),
453 sessions: Mutex::new(SlotMap::with_key()),
454 request_heads: Mutex::new(SlotMap::with_key()),
455 fetch_cancels: Mutex::new(SlotMap::with_key()),
456 pending_readers: Mutex::new(HashMap::new()),
457 config,
458 }
459 }
460
461 pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
465 InsertGuard::new(self)
466 }
467
468 pub fn drain_timeout(&self) -> Duration {
470 self.config.drain_timeout
471 }
472
473 pub fn max_chunk_size(&self) -> usize {
475 self.config.max_chunk_size
476 }
477
478 pub fn count_handles(&self) -> (usize, usize, usize, usize) {
482 let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
483 let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
484 let sessions = self
485 .sessions
486 .lock()
487 .unwrap_or_else(|e| e.into_inner())
488 .len();
489 let total = readers
490 .saturating_add(writers)
491 .saturating_add(sessions)
492 .saturating_add(
493 self.request_heads
494 .lock()
495 .unwrap_or_else(|e| e.into_inner())
496 .len(),
497 )
498 .saturating_add(
499 self.fetch_cancels
500 .lock()
501 .unwrap_or_else(|e| e.into_inner())
502 .len(),
503 );
504 (readers, writers, sessions, total)
505 }
506
507 pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
511 make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
512 }
513
514 fn insert_checked<K: slotmap::Key, T>(
517 registry: &Mutex<SlotMap<K, Timed<T>>>,
518 value: T,
519 max: usize,
520 ) -> Result<u64, CoreError> {
521 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
522 if reg.len() >= max {
523 return Err(CoreError::internal("handle registry at capacity"));
524 }
525 let key = reg.insert(Timed::new(value));
526 Ok(key_to_handle(key))
527 }
528
529 pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
533 Self::insert_checked(&self.readers, reader, self.config.max_handles)
534 }
535
536 pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
538 Self::insert_checked(&self.writers, writer, self.config.max_handles)
539 }
540
541 pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
547 let (writer, reader) = self.make_body_channel();
548 let handle = self.insert_writer(writer)?;
549 Ok((handle, reader))
550 }
551
552 pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
555 self.pending_readers
556 .lock()
557 .unwrap_or_else(|e| e.into_inner())
558 .insert(
559 writer_handle,
560 PendingReaderEntry {
561 reader,
562 created: Instant::now(),
563 },
564 );
565 }
566
567 pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
570 self.pending_readers
571 .lock()
572 .unwrap_or_else(|e| e.into_inner())
573 .remove(&writer_handle)
574 .map(|e| e.reader)
575 }
576
577 pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
584 let (rx_arc, cancel) = {
586 let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
587 let entry = reg
588 .get_mut(handle_to_reader_key(handle))
589 .ok_or_else(|| CoreError::invalid_handle(handle))?;
590 entry.touch();
591 (entry.value.rx.clone(), entry.value.cancel.clone())
592 };
593
594 let chunk = recv_with_cancel(rx_arc, cancel).await;
595
596 if chunk.is_none() {
598 self.readers
599 .lock()
600 .unwrap_or_else(|e| e.into_inner())
601 .remove(handle_to_reader_key(handle));
602 }
603
604 Ok(chunk)
605 }
606
607 pub fn try_next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
619 let rx_arc = {
620 let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
621 let entry = reg
622 .get_mut(handle_to_reader_key(handle))
623 .ok_or_else(|| CoreError::invalid_handle(handle))?;
624 entry.touch();
625 entry.value.rx.clone()
626 };
627
628 let mut rx_guard = match rx_arc.try_lock() {
630 Ok(g) => g,
631 Err(_) => return Err(CoreError::internal("try_next_chunk: lock contended")),
632 };
633
634 match rx_guard.try_recv() {
635 Ok(chunk) => Ok(Some(chunk)),
636 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
637 Err(CoreError::internal("try_next_chunk: channel empty"))
638 }
639 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
640 drop(rx_guard);
642 self.readers
643 .lock()
644 .unwrap_or_else(|e| e.into_inner())
645 .remove(handle_to_reader_key(handle));
646 Ok(None)
647 }
648 }
649 }
650
651 pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
656 let (tx, timeout) = {
658 let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
659 let entry = reg
660 .get_mut(handle_to_writer_key(handle))
661 .ok_or_else(|| CoreError::invalid_handle(handle))?;
662 entry.touch();
663 (entry.value.tx.clone(), entry.value.drain_timeout)
664 };
665 let max = self.config.max_chunk_size;
666 if chunk.len() <= max {
667 tokio::time::timeout(timeout, tx.send(chunk))
668 .await
669 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
670 .map_err(|_| CoreError::internal("body reader dropped"))
671 } else {
672 let mut offset = 0;
674 while offset < chunk.len() {
675 let end = offset.saturating_add(max).min(chunk.len());
676 tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
677 .await
678 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
679 .map_err(|_| CoreError::internal("body reader dropped"))?;
680 offset = end;
681 }
682 Ok(())
683 }
684 }
685
686 pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
688 self.writers
689 .lock()
690 .unwrap_or_else(|e| e.into_inner())
691 .remove(handle_to_writer_key(handle))
692 .ok_or_else(|| CoreError::invalid_handle(handle))?;
693 Ok(())
694 }
695
696 pub fn cancel_reader(&self, handle: u64) {
698 let entry = self
699 .readers
700 .lock()
701 .unwrap_or_else(|e| e.into_inner())
702 .remove(handle_to_reader_key(handle));
703 if let Some(e) = entry {
704 e.value.cancel.notify_waiters();
705 }
706 }
707
708 pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
712 Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
713 }
714
715 pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
717 self.sessions
718 .lock()
719 .unwrap_or_else(|e| e.into_inner())
720 .get(handle_to_session_key(handle))
721 .map(|e| e.value.clone())
722 }
723
724 pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
726 self.sessions
727 .lock()
728 .unwrap_or_else(|e| e.into_inner())
729 .remove(handle_to_session_key(handle))
730 .map(|e| e.value)
731 }
732
733 pub fn allocate_req_handle(
737 &self,
738 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
739 ) -> Result<u64, CoreError> {
740 Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
741 }
742
743 pub fn take_req_sender(
745 &self,
746 handle: u64,
747 ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
748 self.request_heads
749 .lock()
750 .unwrap_or_else(|e| e.into_inner())
751 .remove(handle_to_request_head_key(handle))
752 .map(|e| e.value)
753 }
754
755 pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
759 let notify = Arc::new(tokio::sync::Notify::new());
760 Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
761 }
762
763 pub fn cancel_in_flight(&self, token: u64) {
765 if let Some(entry) = self
766 .fetch_cancels
767 .lock()
768 .unwrap_or_else(|e| e.into_inner())
769 .get(handle_to_fetch_cancel_key(token))
770 {
771 entry.value.notify_one();
772 }
773 }
774
775 pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
777 self.fetch_cancels
778 .lock()
779 .unwrap_or_else(|e| e.into_inner())
780 .get(handle_to_fetch_cancel_key(token))
781 .map(|e| e.value.clone())
782 }
783
784 pub fn remove_fetch_token(&self, token: u64) {
786 self.fetch_cancels
787 .lock()
788 .unwrap_or_else(|e| e.into_inner())
789 .remove(handle_to_fetch_cancel_key(token));
790 }
791
792 pub fn sweep(&self, ttl: Duration) {
798 Self::sweep_readers(&self.readers, ttl);
799 Self::sweep_registry(&self.writers, ttl);
800 Self::sweep_registry(&self.request_heads, ttl);
801 Self::sweep_registry(&self.sessions, ttl);
802 Self::sweep_registry(&self.fetch_cancels, ttl);
803 self.sweep_pending_readers(ttl);
804 }
805
806 fn sweep_readers(registry: &Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>, ttl: Duration) {
809 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
810 let expired: Vec<ReaderKey> = reg
811 .iter()
812 .filter(|(_, e)| e.is_expired(ttl))
813 .map(|(k, _)| k)
814 .collect();
815
816 if expired.is_empty() {
817 return;
818 }
819
820 for key in &expired {
821 if let Some(entry) = reg.remove(*key) {
822 entry.value.cancel.notify_waiters();
823 }
824 }
825 tracing::debug!(
826 "[iroh-http] swept {} expired reader entries (ttl={ttl:?})",
827 expired.len()
828 );
829 if reg.is_empty() && reg.capacity() > 128 {
830 *reg = SlotMap::with_key();
831 }
832 }
833
834 fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
835 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
836 let expired: Vec<K> = reg
837 .iter()
838 .filter(|(_, e)| e.is_expired(ttl))
839 .map(|(k, _)| k)
840 .collect();
841
842 if expired.is_empty() {
843 return;
844 }
845
846 for key in &expired {
847 reg.remove(*key);
848 }
849 tracing::debug!(
850 "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
851 expired.len()
852 );
853 if reg.is_empty() && reg.capacity() > 128 {
855 *reg = SlotMap::with_key();
856 }
857 }
858
859 fn sweep_pending_readers(&self, ttl: Duration) {
860 let mut map = self
861 .pending_readers
862 .lock()
863 .unwrap_or_else(|e| e.into_inner());
864 let before = map.len();
865 map.retain(|_, e| e.created.elapsed() < ttl);
866 let removed = before.saturating_sub(map.len());
867 if removed > 0 {
868 tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
869 }
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876
877 fn test_store() -> HandleStore {
878 HandleStore::new(StoreConfig::default())
879 }
880
881 #[tokio::test]
884 async fn body_channel_send_recv() {
885 let (writer, reader) = make_body_channel();
886 writer.send_chunk(Bytes::from("hello")).await.unwrap();
887 drop(writer); let chunk = reader.next_chunk().await;
889 assert_eq!(chunk, Some(Bytes::from("hello")));
890 let eof = reader.next_chunk().await;
891 assert!(eof.is_none());
892 }
893
894 #[tokio::test]
895 async fn body_channel_multiple_chunks() {
896 let (writer, reader) = make_body_channel();
897 writer.send_chunk(Bytes::from("a")).await.unwrap();
898 writer.send_chunk(Bytes::from("b")).await.unwrap();
899 writer.send_chunk(Bytes::from("c")).await.unwrap();
900 drop(writer);
901
902 let mut collected = Vec::new();
903 while let Some(chunk) = reader.next_chunk().await {
904 collected.push(chunk);
905 }
906 assert_eq!(
907 collected,
908 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
909 );
910 }
911
912 #[tokio::test]
913 async fn body_channel_reader_dropped_returns_error() {
914 let (writer, reader) = make_body_channel();
915 drop(reader);
916 let result = writer.send_chunk(Bytes::from("data")).await;
917 assert!(result.is_err());
918 }
919
920 #[tokio::test]
923 async fn body_writer_sink_forward_from_stream() {
924 use futures::{stream, StreamExt};
925 let (writer, reader) = make_body_channel();
926
927 let chunks = vec![
928 Ok::<_, BoxError>(Bytes::from("one")),
929 Ok(Bytes::from("two")),
930 Ok(Bytes::from("three")),
931 ];
932 let producer = tokio::spawn(async move {
933 stream::iter(chunks).forward(writer).await.unwrap();
934 });
936
937 let mut collected = Vec::new();
938 while let Some(chunk) = reader.next_chunk().await {
939 collected.push(chunk);
940 }
941 producer.await.unwrap();
942
943 assert_eq!(
944 collected,
945 vec![Bytes::from("one"), Bytes::from("two"), Bytes::from("three")]
946 );
947 }
948
949 #[tokio::test]
950 async fn body_writer_sink_send_via_sinkext() {
951 use futures::SinkExt;
952 let (mut writer, reader) = make_body_channel();
953 writer.send(Bytes::from("a")).await.unwrap();
954 writer.send(Bytes::from("b")).await.unwrap();
955 writer.close().await.unwrap();
956 drop(writer);
957
958 let mut collected = Vec::new();
959 while let Some(chunk) = reader.next_chunk().await {
960 collected.push(chunk);
961 }
962 assert_eq!(collected, vec![Bytes::from("a"), Bytes::from("b")]);
963 }
964
965 #[tokio::test]
966 async fn body_writer_sink_propagates_reader_dropped() {
967 use futures::SinkExt;
968 let (mut writer, reader) = make_body_channel();
969 drop(reader);
970 let mut err = None;
974 for _ in 0..(DEFAULT_CHANNEL_CAPACITY + 1) {
975 if let Err(e) = writer.send(Bytes::from("x")).await {
976 err = Some(e);
977 break;
978 }
979 }
980 assert!(err.is_some(), "expected reader-dropped error from Sink");
981 }
982
983 #[tokio::test]
986 async fn insert_reader_and_next_chunk() {
987 let store = test_store();
988 let (writer, reader) = store.make_body_channel();
989 let handle = store.insert_reader(reader).unwrap();
990
991 writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
992 drop(writer);
993
994 let chunk = store.next_chunk(handle).await.unwrap();
995 assert_eq!(chunk, Some(Bytes::from("slab-data")));
996
997 let eof = store.next_chunk(handle).await.unwrap();
999 assert!(eof.is_none());
1000 }
1001
1002 #[tokio::test]
1003 async fn next_chunk_invalid_handle() {
1004 let store = test_store();
1005 let result = store.next_chunk(999999).await;
1006 assert!(result.is_err());
1007 assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
1008 }
1009
1010 #[tokio::test]
1011 async fn send_chunk_via_handle() {
1012 let store = test_store();
1013 let (writer, reader) = store.make_body_channel();
1014 let handle = store.insert_writer(writer).unwrap();
1015
1016 store
1017 .send_chunk(handle, Bytes::from("via-slab"))
1018 .await
1019 .unwrap();
1020 store.finish_body(handle).unwrap();
1021
1022 let chunk = reader.next_chunk().await;
1023 assert_eq!(chunk, Some(Bytes::from("via-slab")));
1024 let eof = reader.next_chunk().await;
1025 assert!(eof.is_none());
1026 }
1027
1028 #[tokio::test]
1029 async fn capacity_cap_rejects_overflow() {
1030 let store = HandleStore::new(StoreConfig {
1031 max_handles: 2,
1032 ..StoreConfig::default()
1033 });
1034 let (_, r1) = store.make_body_channel();
1035 let (_, r2) = store.make_body_channel();
1036 let (_, r3) = store.make_body_channel();
1037 store.insert_reader(r1).unwrap();
1038 store.insert_reader(r2).unwrap();
1039 let err = store.insert_reader(r3).unwrap_err();
1040 assert!(err.message.contains("capacity"));
1041 }
1042
1043 #[tokio::test]
1046 async fn recv_with_cancel_returns_none_on_cancel() {
1047 let (_tx, rx) = mpsc::channel::<Bytes>(4);
1048 let rx = Arc::new(tokio::sync::Mutex::new(rx));
1049 let cancel = Arc::new(tokio::sync::Notify::new());
1050 cancel.notify_one();
1052 let result = recv_with_cancel(rx, cancel).await;
1053 assert!(result.is_none());
1054 }
1055}