1use std::{
9 collections::HashMap,
10 sync::{Arc, Mutex},
11 time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29pub struct SessionEntry {
32 pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36 pub status: u16,
37 pub headers: Vec<(String, String)>,
38}
39
40slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51 k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55 ($fn_name:ident, $key_type:ty) => {
56 fn $fn_name(h: u64) -> $key_type {
57 <$key_type>::from(KeyData::from_ffi(h))
58 }
59 };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68pub struct BodyReader {
74 pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75 pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80pub struct BodyWriter {
83 pub(crate) tx: mpsc::Sender<Bytes>,
84 pub(crate) drain_timeout: Duration,
86}
87
88pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95 make_body_channel_with(
96 DEFAULT_CHANNEL_CAPACITY,
97 Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98 )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102 let (tx, rx) = mpsc::channel(capacity);
103 (
104 BodyWriter { tx, drain_timeout },
105 BodyReader {
106 rx: Arc::new(tokio::sync::Mutex::new(rx)),
107 cancel: Arc::new(tokio::sync::Notify::new()),
108 },
109 )
110}
111
112async fn recv_with_cancel(
121 rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122 cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124 tokio::select! {
125 biased;
126 _ = cancel.notified() => None,
127 chunk = async { rx.lock().await.recv().await } => chunk,
128 }
129}
130
131impl BodyReader {
132 pub async fn next_chunk(&self) -> Option<Bytes> {
135 recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136 }
137}
138
139impl BodyWriter {
140 pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143 tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144 .await
145 .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146 .map_err(|_| "body reader dropped".to_string())
147 }
148}
149
150#[derive(Debug, Clone)]
154pub struct StoreConfig {
155 pub channel_capacity: usize,
157 pub max_chunk_size: usize,
159 pub drain_timeout: Duration,
161 pub max_handles: usize,
163 pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169 fn default() -> Self {
170 Self {
171 channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172 max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173 drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174 max_handles: DEFAULT_MAX_HANDLES,
175 ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176 }
177 }
178}
179
180struct Timed<T> {
183 value: T,
184 last_accessed: Instant,
187}
188
189impl<T> Timed<T> {
190 fn new(value: T) -> Self {
191 Self {
192 value,
193 last_accessed: Instant::now(),
194 }
195 }
196
197 fn touch(&mut self) {
199 self.last_accessed = Instant::now();
200 }
201
202 fn is_expired(&self, ttl: Duration) -> bool {
203 self.last_accessed.elapsed() > ttl
204 }
205}
206
207struct PendingReaderEntry {
209 reader: BodyReader,
210 created: Instant,
211}
212
213pub(crate) struct InsertGuard<'a> {
219 store: &'a HandleStore,
220 tracked: Vec<TrackedHandle>,
221 committed: bool,
222}
223
224enum TrackedHandle {
239 Reader(u64),
240 Writer(u64),
241 ReqHead(u64),
242}
243
244impl<'a> InsertGuard<'a> {
245 fn new(store: &'a HandleStore) -> Self {
246 Self {
247 store,
248 tracked: Vec::new(),
249 committed: false,
250 }
251 }
252
253 pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
254 let h = self.store.insert_reader(reader)?;
255 self.tracked.push(TrackedHandle::Reader(h));
256 Ok(h)
257 }
258
259 pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
260 let h = self.store.insert_writer(writer)?;
261 self.tracked.push(TrackedHandle::Writer(h));
262 Ok(h)
263 }
264
265 pub fn allocate_req_handle(
266 &mut self,
267 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
268 ) -> Result<u64, CoreError> {
269 let h = self.store.allocate_req_handle(sender)?;
270 self.tracked.push(TrackedHandle::ReqHead(h));
271 Ok(h)
272 }
273
274 pub fn commit(mut self) {
276 self.committed = true;
277 }
278}
279
280impl Drop for InsertGuard<'_> {
281 fn drop(&mut self) {
282 if self.committed {
283 return;
284 }
285 for handle in &self.tracked {
286 match handle {
287 TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
288 TrackedHandle::Writer(h) => {
289 let _ = self.store.finish_body(*h);
290 }
291 TrackedHandle::ReqHead(h) => {
292 self.store
293 .request_heads
294 .lock()
295 .unwrap_or_else(|e| e.into_inner())
296 .remove(handle_to_request_head_key(*h));
297 }
298 }
299 }
300 }
301}
302
303pub struct HandleStore {
310 readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
311 writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
312 sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
313 request_heads:
314 Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
315 fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
316 pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
317 pub(crate) config: StoreConfig,
318}
319
320impl HandleStore {
321 pub fn new(config: StoreConfig) -> Self {
323 Self {
324 readers: Mutex::new(SlotMap::with_key()),
325 writers: Mutex::new(SlotMap::with_key()),
326 sessions: Mutex::new(SlotMap::with_key()),
327 request_heads: Mutex::new(SlotMap::with_key()),
328 fetch_cancels: Mutex::new(SlotMap::with_key()),
329 pending_readers: Mutex::new(HashMap::new()),
330 config,
331 }
332 }
333
334 pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
338 InsertGuard::new(self)
339 }
340
341 pub fn drain_timeout(&self) -> Duration {
343 self.config.drain_timeout
344 }
345
346 pub fn max_chunk_size(&self) -> usize {
348 self.config.max_chunk_size
349 }
350
351 pub fn count_handles(&self) -> (usize, usize, usize, usize) {
355 let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
356 let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
357 let sessions = self
358 .sessions
359 .lock()
360 .unwrap_or_else(|e| e.into_inner())
361 .len();
362 let total = readers
363 .saturating_add(writers)
364 .saturating_add(sessions)
365 .saturating_add(
366 self.request_heads
367 .lock()
368 .unwrap_or_else(|e| e.into_inner())
369 .len(),
370 )
371 .saturating_add(
372 self.fetch_cancels
373 .lock()
374 .unwrap_or_else(|e| e.into_inner())
375 .len(),
376 );
377 (readers, writers, sessions, total)
378 }
379
380 pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
384 make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
385 }
386
387 fn insert_checked<K: slotmap::Key, T>(
390 registry: &Mutex<SlotMap<K, Timed<T>>>,
391 value: T,
392 max: usize,
393 ) -> Result<u64, CoreError> {
394 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
395 if reg.len() >= max {
396 return Err(CoreError::internal("handle registry at capacity"));
397 }
398 let key = reg.insert(Timed::new(value));
399 Ok(key_to_handle(key))
400 }
401
402 pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
406 Self::insert_checked(&self.readers, reader, self.config.max_handles)
407 }
408
409 pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
411 Self::insert_checked(&self.writers, writer, self.config.max_handles)
412 }
413
414 pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
420 let (writer, reader) = self.make_body_channel();
421 let handle = self.insert_writer(writer)?;
422 Ok((handle, reader))
423 }
424
425 pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
428 self.pending_readers
429 .lock()
430 .unwrap_or_else(|e| e.into_inner())
431 .insert(
432 writer_handle,
433 PendingReaderEntry {
434 reader,
435 created: Instant::now(),
436 },
437 );
438 }
439
440 pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
443 self.pending_readers
444 .lock()
445 .unwrap_or_else(|e| e.into_inner())
446 .remove(&writer_handle)
447 .map(|e| e.reader)
448 }
449
450 pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
457 let (rx_arc, cancel) = {
459 let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
460 let entry = reg
461 .get_mut(handle_to_reader_key(handle))
462 .ok_or_else(|| CoreError::invalid_handle(handle))?;
463 entry.touch();
464 (entry.value.rx.clone(), entry.value.cancel.clone())
465 };
466
467 let chunk = recv_with_cancel(rx_arc, cancel).await;
468
469 if chunk.is_none() {
471 self.readers
472 .lock()
473 .unwrap_or_else(|e| e.into_inner())
474 .remove(handle_to_reader_key(handle));
475 }
476
477 Ok(chunk)
478 }
479
480 pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
485 let (tx, timeout) = {
487 let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
488 let entry = reg
489 .get_mut(handle_to_writer_key(handle))
490 .ok_or_else(|| CoreError::invalid_handle(handle))?;
491 entry.touch();
492 (entry.value.tx.clone(), entry.value.drain_timeout)
493 };
494 let max = self.config.max_chunk_size;
495 if chunk.len() <= max {
496 tokio::time::timeout(timeout, tx.send(chunk))
497 .await
498 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
499 .map_err(|_| CoreError::internal("body reader dropped"))
500 } else {
501 let mut offset = 0;
503 while offset < chunk.len() {
504 let end = offset.saturating_add(max).min(chunk.len());
505 tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
506 .await
507 .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
508 .map_err(|_| CoreError::internal("body reader dropped"))?;
509 offset = end;
510 }
511 Ok(())
512 }
513 }
514
515 pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
517 self.writers
518 .lock()
519 .unwrap_or_else(|e| e.into_inner())
520 .remove(handle_to_writer_key(handle))
521 .ok_or_else(|| CoreError::invalid_handle(handle))?;
522 Ok(())
523 }
524
525 pub fn cancel_reader(&self, handle: u64) {
527 let entry = self
528 .readers
529 .lock()
530 .unwrap_or_else(|e| e.into_inner())
531 .remove(handle_to_reader_key(handle));
532 if let Some(e) = entry {
533 e.value.cancel.notify_waiters();
534 }
535 }
536
537 pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
541 Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
542 }
543
544 pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
546 self.sessions
547 .lock()
548 .unwrap_or_else(|e| e.into_inner())
549 .get(handle_to_session_key(handle))
550 .map(|e| e.value.clone())
551 }
552
553 pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
555 self.sessions
556 .lock()
557 .unwrap_or_else(|e| e.into_inner())
558 .remove(handle_to_session_key(handle))
559 .map(|e| e.value)
560 }
561
562 pub fn allocate_req_handle(
566 &self,
567 sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
568 ) -> Result<u64, CoreError> {
569 Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
570 }
571
572 pub fn take_req_sender(
574 &self,
575 handle: u64,
576 ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
577 self.request_heads
578 .lock()
579 .unwrap_or_else(|e| e.into_inner())
580 .remove(handle_to_request_head_key(handle))
581 .map(|e| e.value)
582 }
583
584 pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
588 let notify = Arc::new(tokio::sync::Notify::new());
589 Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
590 }
591
592 pub fn cancel_in_flight(&self, token: u64) {
594 if let Some(entry) = self
595 .fetch_cancels
596 .lock()
597 .unwrap_or_else(|e| e.into_inner())
598 .get(handle_to_fetch_cancel_key(token))
599 {
600 entry.value.notify_one();
601 }
602 }
603
604 pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
606 self.fetch_cancels
607 .lock()
608 .unwrap_or_else(|e| e.into_inner())
609 .get(handle_to_fetch_cancel_key(token))
610 .map(|e| e.value.clone())
611 }
612
613 pub fn remove_fetch_token(&self, token: u64) {
615 self.fetch_cancels
616 .lock()
617 .unwrap_or_else(|e| e.into_inner())
618 .remove(handle_to_fetch_cancel_key(token));
619 }
620
621 pub fn sweep(&self, ttl: Duration) {
627 Self::sweep_registry(&self.readers, ttl);
628 Self::sweep_registry(&self.writers, ttl);
629 Self::sweep_registry(&self.request_heads, ttl);
630 Self::sweep_registry(&self.sessions, ttl);
631 Self::sweep_registry(&self.fetch_cancels, ttl);
632 self.sweep_pending_readers(ttl);
633 }
634
635 fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
636 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
637 let expired: Vec<K> = reg
638 .iter()
639 .filter(|(_, e)| e.is_expired(ttl))
640 .map(|(k, _)| k)
641 .collect();
642
643 if expired.is_empty() {
644 return;
645 }
646
647 for key in &expired {
648 reg.remove(*key);
649 }
650 tracing::debug!(
651 "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
652 expired.len()
653 );
654 if reg.is_empty() && reg.capacity() > 128 {
656 *reg = SlotMap::with_key();
657 }
658 }
659
660 fn sweep_pending_readers(&self, ttl: Duration) {
661 let mut map = self
662 .pending_readers
663 .lock()
664 .unwrap_or_else(|e| e.into_inner());
665 let before = map.len();
666 map.retain(|_, e| e.created.elapsed() < ttl);
667 let removed = before.saturating_sub(map.len());
668 if removed > 0 {
669 tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
670 }
671 }
672}
673
674pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
678
679pub(crate) async fn pump_quic_recv_to_body(
684 mut recv: iroh::endpoint::RecvStream,
685 writer: BodyWriter,
686) {
687 while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
688 if writer.send_chunk(chunk.bytes).await.is_err() {
689 break;
690 }
691 }
692 }
694
695pub(crate) async fn pump_body_to_quic_send(
700 reader: BodyReader,
701 mut send: iroh::endpoint::SendStream,
702) {
703 loop {
704 match reader.next_chunk().await {
705 None => break,
706 Some(data) => {
707 if send.write_all(&data).await.is_err() {
708 break;
709 }
710 }
711 }
712 }
713 let _ = send.finish();
714}
715
716pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
723where
724 IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
725{
726 let (mut recv, mut send) = tokio::io::split(io);
727
728 tokio::join!(
729 async {
730 use bytes::BytesMut;
731 use tokio::io::AsyncReadExt;
732 let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
733 loop {
734 buf.clear();
735 match recv.read_buf(&mut buf).await {
736 Ok(0) | Err(_) => break,
737 Ok(_) => {
738 if writer.send_chunk(buf.split().freeze()).await.is_err() {
739 break;
740 }
741 }
742 }
743 }
744 },
745 async {
746 use tokio::io::AsyncWriteExt;
747 loop {
748 match reader.next_chunk().await {
749 None => break,
750 Some(data) => {
751 if send.write_all(&data).await.is_err() {
752 break;
753 }
754 }
755 }
756 }
757 let _ = send.shutdown().await;
758 },
759 );
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 fn test_store() -> HandleStore {
767 HandleStore::new(StoreConfig::default())
768 }
769
770 #[tokio::test]
773 async fn body_channel_send_recv() {
774 let (writer, reader) = make_body_channel();
775 writer.send_chunk(Bytes::from("hello")).await.unwrap();
776 drop(writer); let chunk = reader.next_chunk().await;
778 assert_eq!(chunk, Some(Bytes::from("hello")));
779 let eof = reader.next_chunk().await;
780 assert!(eof.is_none());
781 }
782
783 #[tokio::test]
784 async fn body_channel_multiple_chunks() {
785 let (writer, reader) = make_body_channel();
786 writer.send_chunk(Bytes::from("a")).await.unwrap();
787 writer.send_chunk(Bytes::from("b")).await.unwrap();
788 writer.send_chunk(Bytes::from("c")).await.unwrap();
789 drop(writer);
790
791 let mut collected = Vec::new();
792 while let Some(chunk) = reader.next_chunk().await {
793 collected.push(chunk);
794 }
795 assert_eq!(
796 collected,
797 vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
798 );
799 }
800
801 #[tokio::test]
802 async fn body_channel_reader_dropped_returns_error() {
803 let (writer, reader) = make_body_channel();
804 drop(reader);
805 let result = writer.send_chunk(Bytes::from("data")).await;
806 assert!(result.is_err());
807 }
808
809 #[tokio::test]
812 async fn insert_reader_and_next_chunk() {
813 let store = test_store();
814 let (writer, reader) = store.make_body_channel();
815 let handle = store.insert_reader(reader).unwrap();
816
817 writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
818 drop(writer);
819
820 let chunk = store.next_chunk(handle).await.unwrap();
821 assert_eq!(chunk, Some(Bytes::from("slab-data")));
822
823 let eof = store.next_chunk(handle).await.unwrap();
825 assert!(eof.is_none());
826 }
827
828 #[tokio::test]
829 async fn next_chunk_invalid_handle() {
830 let store = test_store();
831 let result = store.next_chunk(999999).await;
832 assert!(result.is_err());
833 assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
834 }
835
836 #[tokio::test]
837 async fn send_chunk_via_handle() {
838 let store = test_store();
839 let (writer, reader) = store.make_body_channel();
840 let handle = store.insert_writer(writer).unwrap();
841
842 store
843 .send_chunk(handle, Bytes::from("via-slab"))
844 .await
845 .unwrap();
846 store.finish_body(handle).unwrap();
847
848 let chunk = reader.next_chunk().await;
849 assert_eq!(chunk, Some(Bytes::from("via-slab")));
850 let eof = reader.next_chunk().await;
851 assert!(eof.is_none());
852 }
853
854 #[tokio::test]
855 async fn capacity_cap_rejects_overflow() {
856 let store = HandleStore::new(StoreConfig {
857 max_handles: 2,
858 ..StoreConfig::default()
859 });
860 let (_, r1) = store.make_body_channel();
861 let (_, r2) = store.make_body_channel();
862 let (_, r3) = store.make_body_channel();
863 store.insert_reader(r1).unwrap();
864 store.insert_reader(r2).unwrap();
865 let err = store.insert_reader(r3).unwrap_err();
866 assert!(err.message.contains("capacity"));
867 }
868
869 #[tokio::test]
872 async fn recv_with_cancel_returns_none_on_cancel() {
873 let (_tx, rx) = mpsc::channel::<Bytes>(4);
874 let rx = Arc::new(tokio::sync::Mutex::new(rx));
875 let cancel = Arc::new(tokio::sync::Notify::new());
876 cancel.notify_one();
878 let result = recv_with_cancel(rx, cancel).await;
879 assert!(result.is_none());
880 }
881}