1use std::{
9 collections::HashMap,
10 sync::{Arc, Mutex},
11 time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29pub struct SessionEntry {
32 pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36 pub status: u16,
37 pub headers: Vec<(String, String)>,
38}
39
40slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51 k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55 ($fn_name:ident, $key_type:ty) => {
56 fn $fn_name(h: u64) -> $key_type {
57 <$key_type>::from(KeyData::from_ffi(h))
58 }
59 };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68pub struct BodyReader {
74 pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75 pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80pub struct BodyWriter {
83 pub(crate) tx: mpsc::Sender<Bytes>,
84 pub(crate) drain_timeout: Duration,
86}
87
88pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95 make_body_channel_with(
96 DEFAULT_CHANNEL_CAPACITY,
97 Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98 )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102 let (tx, rx) = mpsc::channel(capacity);
103 (
104 BodyWriter { tx, drain_timeout },
105 BodyReader {
106 rx: Arc::new(tokio::sync::Mutex::new(rx)),
107 cancel: Arc::new(tokio::sync::Notify::new()),
108 },
109 )
110}
111
112async fn recv_with_cancel(
121 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122 cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124 tokio::select! {
125 biased;
126 _ = cancel.notified() => None,
127 chunk = async { rx.lock().await.recv().await } => chunk,
128 }
129}
130
131impl BodyReader {
132 pub async fn next_chunk(&self) -> Option<Bytes> {
135 recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136 }
137}
138
139impl BodyWriter {
140 pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143 tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144 .await
145 .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146 .map_err(|_| "body reader dropped".to_string())
147 }
148}
149
150#[derive(Debug, Clone)]
154pub struct StoreConfig {
155 pub channel_capacity: usize,
157 pub max_chunk_size: usize,
159 pub drain_timeout: Duration,
161 pub max_handles: usize,
163 pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169 fn default() -> Self {
170 Self {
171 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172 max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173 drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174 max_handles: DEFAULT_MAX_HANDLES,
175 ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176 }
177 }
178}
179
180struct Timed<T> {
183 value: T,
184 created_at: Instant,
185}
186
187impl<T> Timed<T> {
188 fn new(value: T) -> Self {
189 Self {
190 value,
191 created_at: Instant::now(),
192 }
193 }
194
195 fn is_expired(&self, ttl: Duration) -> bool {
196 self.created_at.elapsed() > ttl
197 }
198}
199
200struct PendingReaderEntry {
202 reader: BodyReader,
203 created: Instant,
204}
205
206pub(crate) struct InsertGuard<'a> {
212 store: &'a HandleStore,
213 tracked: Vec<TrackedHandle>,
214 committed: bool,
215}
216
217enum TrackedHandle {
232 Reader(u64),
233 Writer(u64),
234 ReqHead(u64),
235}
236
237impl<'a> InsertGuard<'a> {
238 fn new(store: &'a HandleStore) -> Self {
239 Self {
240 store,
241 tracked: Vec::new(),
242 committed: false,
243 }
244 }
245
246 pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
247 let h = self.store.insert_reader(reader)?;
248 self.tracked.push(TrackedHandle::Reader(h));
249 Ok(h)
250 }
251
252 pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
253 let h = self.store.insert_writer(writer)?;
254 self.tracked.push(TrackedHandle::Writer(h));
255 Ok(h)
256 }
257
258 pub fn allocate_req_handle(
259 &mut self,
260 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
261 ) -> Result<u64, CoreError> {
262 let h = self.store.allocate_req_handle(sender)?;
263 self.tracked.push(TrackedHandle::ReqHead(h));
264 Ok(h)
265 }
266
267 pub fn commit(mut self) {
269 self.committed = true;
270 }
271}
272
273impl Drop for InsertGuard<'_> {
274 fn drop(&mut self) {
275 if self.committed {
276 return;
277 }
278 for handle in &self.tracked {
279 match handle {
280 TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
281 TrackedHandle::Writer(h) => {
282 let _ = self.store.finish_body(*h);
283 }
284 TrackedHandle::ReqHead(h) => {
285 self.store
286 .request_heads
287 .lock()
288 .unwrap_or_else(|e| e.into_inner())
289 .remove(handle_to_request_head_key(*h));
290 }
291 }
292 }
293 }
294}
295
296pub struct HandleStore {
303 readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
304 writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
305 sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
306 request_heads:
307 Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
308 fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
309 pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
310 pub(crate) config: StoreConfig,
311}
312
313impl HandleStore {
314 pub fn new(config: StoreConfig) -> Self {
316 Self {
317 readers: Mutex::new(SlotMap::with_key()),
318 writers: Mutex::new(SlotMap::with_key()),
319 sessions: Mutex::new(SlotMap::with_key()),
320 request_heads: Mutex::new(SlotMap::with_key()),
321 fetch_cancels: Mutex::new(SlotMap::with_key()),
322 pending_readers: Mutex::new(HashMap::new()),
323 config,
324 }
325 }
326
327 pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
331 InsertGuard::new(self)
332 }
333
334 pub fn drain_timeout(&self) -> Duration {
336 self.config.drain_timeout
337 }
338
339 pub fn max_chunk_size(&self) -> usize {
341 self.config.max_chunk_size
342 }
343
344 pub fn count_handles(&self) -> (usize, usize, usize, usize) {
348 let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
349 let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
350 let sessions = self
351 .sessions
352 .lock()
353 .unwrap_or_else(|e| e.into_inner())
354 .len();
355 let total = readers
356 .saturating_add(writers)
357 .saturating_add(sessions)
358 .saturating_add(
359 self.request_heads
360 .lock()
361 .unwrap_or_else(|e| e.into_inner())
362 .len(),
363 )
364 .saturating_add(
365 self.fetch_cancels
366 .lock()
367 .unwrap_or_else(|e| e.into_inner())
368 .len(),
369 );
370 (readers, writers, sessions, total)
371 }
372
373 pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
377 make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
378 }
379
380 fn insert_checked<K: slotmap::Key, T>(
383 registry: &Mutex<SlotMap<K, Timed<T>>>,
384 value: T,
385 max: usize,
386 ) -> Result<u64, CoreError> {
387 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
388 if reg.len() >= max {
389 return Err(CoreError::internal("handle registry at capacity"));
390 }
391 let key = reg.insert(Timed::new(value));
392 Ok(key_to_handle(key))
393 }
394
395 pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
399 Self::insert_checked(&self.readers, reader, self.config.max_handles)
400 }
401
402 pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
404 Self::insert_checked(&self.writers, writer, self.config.max_handles)
405 }
406
407 pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
413 let (writer, reader) = self.make_body_channel();
414 let handle = self.insert_writer(writer)?;
415 Ok((handle, reader))
416 }
417
418 pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
421 self.pending_readers
422 .lock()
423 .unwrap_or_else(|e| e.into_inner())
424 .insert(
425 writer_handle,
426 PendingReaderEntry {
427 reader,
428 created: Instant::now(),
429 },
430 );
431 }
432
433 pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
436 self.pending_readers
437 .lock()
438 .unwrap_or_else(|e| e.into_inner())
439 .remove(&writer_handle)
440 .map(|e| e.reader)
441 }
442
443 pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
450 let (rx_arc, cancel) = {
452 let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
453 let entry = reg
454 .get(handle_to_reader_key(handle))
455 .ok_or_else(|| CoreError::invalid_handle(handle))?;
456 (entry.value.rx.clone(), entry.value.cancel.clone())
457 };
458
459 let chunk = recv_with_cancel(rx_arc, cancel).await;
460
461 if chunk.is_none() {
463 self.readers
464 .lock()
465 .unwrap_or_else(|e| e.into_inner())
466 .remove(handle_to_reader_key(handle));
467 }
468
469 Ok(chunk)
470 }
471
472 pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
477 let (tx, timeout) = {
479 let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
480 let entry = reg
481 .get(handle_to_writer_key(handle))
482 .ok_or_else(|| CoreError::invalid_handle(handle))?;
483 (entry.value.tx.clone(), entry.value.drain_timeout)
484 };
485 let max = self.config.max_chunk_size;
486 if chunk.len() <= max {
487 tokio::time::timeout(timeout, tx.send(chunk))
488 .await
489 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
490 .map_err(|_| CoreError::internal("body reader dropped"))
491 } else {
492 let mut offset = 0;
494 while offset < chunk.len() {
495 let end = offset.saturating_add(max).min(chunk.len());
496 tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
497 .await
498 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
499 .map_err(|_| CoreError::internal("body reader dropped"))?;
500 offset = end;
501 }
502 Ok(())
503 }
504 }
505
506 pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
508 self.writers
509 .lock()
510 .unwrap_or_else(|e| e.into_inner())
511 .remove(handle_to_writer_key(handle))
512 .ok_or_else(|| CoreError::invalid_handle(handle))?;
513 Ok(())
514 }
515
516 pub fn cancel_reader(&self, handle: u64) {
518 let entry = self
519 .readers
520 .lock()
521 .unwrap_or_else(|e| e.into_inner())
522 .remove(handle_to_reader_key(handle));
523 if let Some(e) = entry {
524 e.value.cancel.notify_waiters();
525 }
526 }
527
528 pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
532 Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
533 }
534
535 pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
537 self.sessions
538 .lock()
539 .unwrap_or_else(|e| e.into_inner())
540 .get(handle_to_session_key(handle))
541 .map(|e| e.value.clone())
542 }
543
544 pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
546 self.sessions
547 .lock()
548 .unwrap_or_else(|e| e.into_inner())
549 .remove(handle_to_session_key(handle))
550 .map(|e| e.value)
551 }
552
553 pub fn allocate_req_handle(
557 &self,
558 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
559 ) -> Result<u64, CoreError> {
560 Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
561 }
562
563 pub fn take_req_sender(
565 &self,
566 handle: u64,
567 ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
568 self.request_heads
569 .lock()
570 .unwrap_or_else(|e| e.into_inner())
571 .remove(handle_to_request_head_key(handle))
572 .map(|e| e.value)
573 }
574
575 pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
579 let notify = Arc::new(tokio::sync::Notify::new());
580 Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
581 }
582
583 pub fn cancel_in_flight(&self, token: u64) {
585 if let Some(entry) = self
586 .fetch_cancels
587 .lock()
588 .unwrap_or_else(|e| e.into_inner())
589 .get(handle_to_fetch_cancel_key(token))
590 {
591 entry.value.notify_one();
592 }
593 }
594
595 pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
597 self.fetch_cancels
598 .lock()
599 .unwrap_or_else(|e| e.into_inner())
600 .get(handle_to_fetch_cancel_key(token))
601 .map(|e| e.value.clone())
602 }
603
604 pub fn remove_fetch_token(&self, token: u64) {
606 self.fetch_cancels
607 .lock()
608 .unwrap_or_else(|e| e.into_inner())
609 .remove(handle_to_fetch_cancel_key(token));
610 }
611
612 pub fn sweep(&self, ttl: Duration) {
618 Self::sweep_registry(&self.readers, ttl);
619 Self::sweep_registry(&self.writers, ttl);
620 Self::sweep_registry(&self.request_heads, ttl);
621 Self::sweep_registry(&self.sessions, ttl);
622 Self::sweep_registry(&self.fetch_cancels, ttl);
623 self.sweep_pending_readers(ttl);
624 }
625
626 fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
627 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
628 let expired: Vec<K> = reg
629 .iter()
630 .filter(|(_, e)| e.is_expired(ttl))
631 .map(|(k, _)| k)
632 .collect();
633
634 if expired.is_empty() {
635 return;
636 }
637
638 for key in &expired {
639 reg.remove(*key);
640 }
641 tracing::debug!(
642 "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
643 expired.len()
644 );
645 if reg.is_empty() && reg.capacity() > 128 {
647 *reg = SlotMap::with_key();
648 }
649 }
650
651 fn sweep_pending_readers(&self, ttl: Duration) {
652 let mut map = self
653 .pending_readers
654 .lock()
655 .unwrap_or_else(|e| e.into_inner());
656 let before = map.len();
657 map.retain(|_, e| e.created.elapsed() < ttl);
658 let removed = before.saturating_sub(map.len());
659 if removed > 0 {
660 tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
661 }
662 }
663}
664
665pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
669
670pub(crate) async fn pump_quic_recv_to_body(
675 mut recv: iroh::endpoint::RecvStream,
676 writer: BodyWriter,
677) {
678 while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
679 if writer.send_chunk(chunk.bytes).await.is_err() {
680 break;
681 }
682 }
683 }
685
686pub(crate) async fn pump_body_to_quic_send(
691 reader: BodyReader,
692 mut send: iroh::endpoint::SendStream,
693) {
694 loop {
695 match reader.next_chunk().await {
696 None => break,
697 Some(data) => {
698 if send.write_all(&data).await.is_err() {
699 break;
700 }
701 }
702 }
703 }
704 let _ = send.finish();
705}
706
707pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
714where
715 IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
716{
717 let (mut recv, mut send) = tokio::io::split(io);
718
719 tokio::join!(
720 async {
721 use bytes::BytesMut;
722 use tokio::io::AsyncReadExt;
723 let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
724 loop {
725 buf.clear();
726 match recv.read_buf(&mut buf).await {
727 Ok(0) | Err(_) => break,
728 Ok(_) => {
729 if writer.send_chunk(buf.split().freeze()).await.is_err() {
730 break;
731 }
732 }
733 }
734 }
735 },
736 async {
737 use tokio::io::AsyncWriteExt;
738 loop {
739 match reader.next_chunk().await {
740 None => break,
741 Some(data) => {
742 if send.write_all(&data).await.is_err() {
743 break;
744 }
745 }
746 }
747 }
748 let _ = send.shutdown().await;
749 },
750 );
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756
757 fn test_store() -> HandleStore {
758 HandleStore::new(StoreConfig::default())
759 }
760
761 #[tokio::test]
764 async fn body_channel_send_recv() {
765 let (writer, reader) = make_body_channel();
766 writer.send_chunk(Bytes::from("hello")).await.unwrap();
767 drop(writer); let chunk = reader.next_chunk().await;
769 assert_eq!(chunk, Some(Bytes::from("hello")));
770 let eof = reader.next_chunk().await;
771 assert!(eof.is_none());
772 }
773
774 #[tokio::test]
775 async fn body_channel_multiple_chunks() {
776 let (writer, reader) = make_body_channel();
777 writer.send_chunk(Bytes::from("a")).await.unwrap();
778 writer.send_chunk(Bytes::from("b")).await.unwrap();
779 writer.send_chunk(Bytes::from("c")).await.unwrap();
780 drop(writer);
781
782 let mut collected = Vec::new();
783 while let Some(chunk) = reader.next_chunk().await {
784 collected.push(chunk);
785 }
786 assert_eq!(
787 collected,
788 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
789 );
790 }
791
792 #[tokio::test]
793 async fn body_channel_reader_dropped_returns_error() {
794 let (writer, reader) = make_body_channel();
795 drop(reader);
796 let result = writer.send_chunk(Bytes::from("data")).await;
797 assert!(result.is_err());
798 }
799
800 #[tokio::test]
803 async fn insert_reader_and_next_chunk() {
804 let store = test_store();
805 let (writer, reader) = store.make_body_channel();
806 let handle = store.insert_reader(reader).unwrap();
807
808 writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
809 drop(writer);
810
811 let chunk = store.next_chunk(handle).await.unwrap();
812 assert_eq!(chunk, Some(Bytes::from("slab-data")));
813
814 let eof = store.next_chunk(handle).await.unwrap();
816 assert!(eof.is_none());
817 }
818
819 #[tokio::test]
820 async fn next_chunk_invalid_handle() {
821 let store = test_store();
822 let result = store.next_chunk(999999).await;
823 assert!(result.is_err());
824 assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
825 }
826
827 #[tokio::test]
828 async fn send_chunk_via_handle() {
829 let store = test_store();
830 let (writer, reader) = store.make_body_channel();
831 let handle = store.insert_writer(writer).unwrap();
832
833 store
834 .send_chunk(handle, Bytes::from("via-slab"))
835 .await
836 .unwrap();
837 store.finish_body(handle).unwrap();
838
839 let chunk = reader.next_chunk().await;
840 assert_eq!(chunk, Some(Bytes::from("via-slab")));
841 let eof = reader.next_chunk().await;
842 assert!(eof.is_none());
843 }
844
845 #[tokio::test]
846 async fn capacity_cap_rejects_overflow() {
847 let store = HandleStore::new(StoreConfig {
848 max_handles: 2,
849 ..StoreConfig::default()
850 });
851 let (_, r1) = store.make_body_channel();
852 let (_, r2) = store.make_body_channel();
853 let (_, r3) = store.make_body_channel();
854 store.insert_reader(r1).unwrap();
855 store.insert_reader(r2).unwrap();
856 let err = store.insert_reader(r3).unwrap_err();
857 assert!(err.message.contains("capacity"));
858 }
859
860 #[tokio::test]
863 async fn recv_with_cancel_returns_none_on_cancel() {
864 let (_tx, rx) = mpsc::channel::<Bytes>(4);
865 let rx = Arc::new(tokio::sync::Mutex::new(rx));
866 let cancel = Arc::new(tokio::sync::Notify::new());
867 cancel.notify_one();
869 let result = recv_with_cancel(rx, cancel).await;
870 assert!(result.is_none());
871 }
872}