Skip to main content

iroh_http_core/
stream.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7
8use std::{
9    collections::HashMap,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
24pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
25pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
26pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; // 60 s
27pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29// ── Resource types ────────────────────────────────────────────────────────────
30
31pub struct SessionEntry {
32    pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36    pub status: u16,
37    pub headers: Vec<(String, String)>,
38}
39
40// ── SlotMap key types ─────────────────────────────────────────────────────────
41
42slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48// ── Handle encode / decode helpers ───────────────────────────────────────────
49
50fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51    k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55    ($fn_name:ident, $key_type:ty) => {
56        fn $fn_name(h: u64) -> $key_type {
57            <$key_type>::from(KeyData::from_ffi(h))
58        }
59    };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68// ── Body channel primitives ───────────────────────────────────────────────────
69
70/// Consumer end — stored in the reader registry.
71/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
72/// the registry's `std::sync::Mutex`.
73pub struct BodyReader {
74    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
76    /// so in-flight `next_chunk` awaits terminate promptly.
77    pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80/// Producer end — stored in the writer registry.
81/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
82pub struct BodyWriter {
83    pub(crate) tx: mpsc::Sender<Bytes>,
84    /// Drain timeout baked in at channel-creation time from the endpoint config.
85    pub(crate) drain_timeout: Duration,
86}
87
88/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
89///
90/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
91/// the channel inherits the endpoint's backpressure config.  This free
92/// function uses the compile-time defaults and exists for tests and pre-bind
93/// code paths.
94pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95    make_body_channel_with(
96        DEFAULT_CHANNEL_CAPACITY,
97        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98    )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102    let (tx, rx) = mpsc::channel(capacity);
103    (
104        BodyWriter { tx, drain_timeout },
105        BodyReader {
106            rx: Arc::new(tokio::sync::Mutex::new(rx)),
107            cancel: Arc::new(tokio::sync::Notify::new()),
108        },
109    )
110}
111
112// ── Cancellable receive ───────────────────────────────────────────────────────
113
114/// Receive the next chunk from a body channel, aborting immediately if
115/// `cancel` is notified.
116///
117/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
118/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
119/// this helper so the cancellation semantics are defined and tested in one place.
120async fn recv_with_cancel(
121    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122    cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124    tokio::select! {
125        biased;
126        _ = cancel.notified() => None,
127        chunk = async { rx.lock().await.recv().await } => chunk,
128    }
129}
130
131impl BodyReader {
132    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
133    /// or when the reader has been cancelled.
134    pub async fn next_chunk(&self) -> Option<Bytes> {
135        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136    }
137}
138
139impl BodyWriter {
140    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
141    /// the drain timeout expires (JS not reading fast enough).
142    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144            .await
145            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146            .map_err(|_| "body reader dropped".to_string())
147    }
148}
149
150// ── StoreConfig ───────────────────────────────────────────────────────────────
151
152/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
153#[derive(Debug, Clone)]
154pub struct StoreConfig {
155    /// Body-channel capacity (in chunks).  Minimum 1.
156    pub channel_capacity: usize,
157    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
158    pub max_chunk_size: usize,
159    /// Milliseconds to wait for a slow body reader before dropping.
160    pub drain_timeout: Duration,
161    /// Maximum handle slots per registry.  Prevents unbounded growth.
162    pub max_handles: usize,
163    /// TTL for handle entries; expired entries are swept periodically.
164    /// Zero disables sweeping.
165    pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169    fn default() -> Self {
170        Self {
171            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174            max_handles: DEFAULT_MAX_HANDLES,
175            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176        }
177    }
178}
179
180// ── Timed wrapper ─────────────────────────────────────────────────────────────
181
182struct Timed<T> {
183    value: T,
184    /// Updated on every access so that actively-used handles are not TTL-swept
185    /// mid-transfer (fix for iroh-http#119 Bug 3).
186    last_accessed: Instant,
187}
188
189impl<T> Timed<T> {
190    fn new(value: T) -> Self {
191        Self {
192            value,
193            last_accessed: Instant::now(),
194        }
195    }
196
197    /// Refresh the last-access timestamp.  Call inside the registry lock.
198    fn touch(&mut self) {
199        self.last_accessed = Instant::now();
200    }
201
202    fn is_expired(&self, ttl: Duration) -> bool {
203        self.last_accessed.elapsed() > ttl
204    }
205}
206
207/// Pending reader tracked with insertion time for TTL sweep.
208struct PendingReaderEntry {
209    reader: BodyReader,
210    created: Instant,
211}
212
213// ── HandleStore ───────────────────────────────────────────────────────────────
214
215/// Tracks handles inserted during a multi-handle allocation sequence.
216/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
217/// has been called. This prevents orphaned handles when a later insert fails.
218pub(crate) struct InsertGuard<'a> {
219    store: &'a HandleStore,
220    tracked: Vec<TrackedHandle>,
221    committed: bool,
222}
223
224/// A handle tracked by [`InsertGuard`] for rollback on drop.
225///
226/// # Intentionally omitted variants
227///
228/// `Session` and `FetchCancel` are not tracked here because their lifecycles
229/// are managed outside of multi-handle allocation sequences:
230/// - Sessions are created and closed by `session_connect` / `session_close`
231///   independently and are never allocated inside a guard.
232/// - Fetch cancel tokens are allocated before a guard is opened and are
233///   always cleaned up by `remove_fetch_token` after the fetch resolves.
234///
235/// If either type is ever added to a guard-guarded allocation path in the
236/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
237/// corresponding rollback arms in [`InsertGuard::drop`].
238enum TrackedHandle {
239    Reader(u64),
240    Writer(u64),
241    ReqHead(u64),
242}
243
244impl<'a> InsertGuard<'a> {
245    fn new(store: &'a HandleStore) -> Self {
246        Self {
247            store,
248            tracked: Vec::new(),
249            committed: false,
250        }
251    }
252
253    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
254        let h = self.store.insert_reader(reader)?;
255        self.tracked.push(TrackedHandle::Reader(h));
256        Ok(h)
257    }
258
259    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
260        let h = self.store.insert_writer(writer)?;
261        self.tracked.push(TrackedHandle::Writer(h));
262        Ok(h)
263    }
264
265    pub fn allocate_req_handle(
266        &mut self,
267        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
268    ) -> Result<u64, CoreError> {
269        let h = self.store.allocate_req_handle(sender)?;
270        self.tracked.push(TrackedHandle::ReqHead(h));
271        Ok(h)
272    }
273
274    /// Consume the guard without rolling back. Call after all inserts succeed.
275    pub fn commit(mut self) {
276        self.committed = true;
277    }
278}
279
280impl Drop for InsertGuard<'_> {
281    fn drop(&mut self) {
282        if self.committed {
283            return;
284        }
285        for handle in &self.tracked {
286            match handle {
287                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
288                TrackedHandle::Writer(h) => {
289                    let _ = self.store.finish_body(*h);
290                }
291                TrackedHandle::ReqHead(h) => {
292                    self.store
293                        .request_heads
294                        .lock()
295                        .unwrap_or_else(|e| e.into_inner())
296                        .remove(handle_to_request_head_key(*h));
297                }
298            }
299        }
300    }
301}
302
303/// Per-endpoint handle registry.  Owns all body readers, writers,
304/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
305/// a single `IrohEndpoint`.
306///
307/// When the endpoint is dropped, this store is dropped with it — all
308/// slot-maps are freed and any remaining handles become invalid.
309pub struct HandleStore {
310    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
311    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
312    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
313    request_heads:
314        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
315    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
316    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
317    pub(crate) config: StoreConfig,
318}
319
320impl HandleStore {
321    /// Create a new handle store with the given configuration.
322    pub fn new(config: StoreConfig) -> Self {
323        Self {
324            readers: Mutex::new(SlotMap::with_key()),
325            writers: Mutex::new(SlotMap::with_key()),
326            sessions: Mutex::new(SlotMap::with_key()),
327            request_heads: Mutex::new(SlotMap::with_key()),
328            fetch_cancels: Mutex::new(SlotMap::with_key()),
329            pending_readers: Mutex::new(HashMap::new()),
330            config,
331        }
332    }
333
334    // ── Config accessors ─────────────────────────────────────────────────
335
336    /// Create a guard for multi-handle allocation with automatic rollback.
337    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
338        InsertGuard::new(self)
339    }
340
341    /// The configured drain timeout.
342    pub fn drain_timeout(&self) -> Duration {
343        self.config.drain_timeout
344    }
345
346    /// The configured maximum chunk size.
347    pub fn max_chunk_size(&self) -> usize {
348        self.config.max_chunk_size
349    }
350
351    /// Snapshot of handle counts for observability.
352    ///
353    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
354    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
355        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
356        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
357        let sessions = self
358            .sessions
359            .lock()
360            .unwrap_or_else(|e| e.into_inner())
361            .len();
362        let total = readers
363            .saturating_add(writers)
364            .saturating_add(sessions)
365            .saturating_add(
366                self.request_heads
367                    .lock()
368                    .unwrap_or_else(|e| e.into_inner())
369                    .len(),
370            )
371            .saturating_add(
372                self.fetch_cancels
373                    .lock()
374                    .unwrap_or_else(|e| e.into_inner())
375                    .len(),
376            );
377        (readers, writers, sessions, total)
378    }
379
380    // ── Body channels ────────────────────────────────────────────────────
381
382    /// Create a matched (writer, reader) pair using this store's config.
383    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
384        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
385    }
386
387    // ── Capacity-checked insert ──────────────────────────────────────────
388
389    fn insert_checked<K: slotmap::Key, T>(
390        registry: &Mutex<SlotMap<K, Timed<T>>>,
391        value: T,
392        max: usize,
393    ) -> Result<u64, CoreError> {
394        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
395        if reg.len() >= max {
396            return Err(CoreError::internal("handle registry at capacity"));
397        }
398        let key = reg.insert(Timed::new(value));
399        Ok(key_to_handle(key))
400    }
401
402    // ── Body reader / writer ─────────────────────────────────────────────
403
404    /// Insert a `BodyReader` and return a handle.
405    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
406        Self::insert_checked(&self.readers, reader, self.config.max_handles)
407    }
408
409    /// Insert a `BodyWriter` and return a handle.
410    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
411        Self::insert_checked(&self.writers, writer, self.config.max_handles)
412    }
413
414    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
415    ///
416    /// The writer handle is returned to JS.  The reader must be stashed via
417    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
418    /// can claim it.
419    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
420        let (writer, reader) = self.make_body_channel();
421        let handle = self.insert_writer(writer)?;
422        Ok((handle, reader))
423    }
424
425    /// Store the reader side of a newly allocated writer channel so that the
426    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
427    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
428        self.pending_readers
429            .lock()
430            .unwrap_or_else(|e| e.into_inner())
431            .insert(
432                writer_handle,
433                PendingReaderEntry {
434                    reader,
435                    created: Instant::now(),
436                },
437            );
438    }
439
440    /// Claim the reader that was paired with `writer_handle`.
441    /// Returns `None` if already claimed or never stored.
442    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
443        self.pending_readers
444            .lock()
445            .unwrap_or_else(|e| e.into_inner())
446            .remove(&writer_handle)
447            .map(|e| e.reader)
448    }
449
450    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
451
452    /// Pull the next chunk from a reader handle.
453    ///
454    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
455    /// cleaned up from the registry automatically.
456    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
457        // Clone the Arc — allows awaiting without holding the registry mutex.
458        let (rx_arc, cancel) = {
459            let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
460            let entry = reg
461                .get_mut(handle_to_reader_key(handle))
462                .ok_or_else(|| CoreError::invalid_handle(handle))?;
463            entry.touch();
464            (entry.value.rx.clone(), entry.value.cancel.clone())
465        };
466
467        let chunk = recv_with_cancel(rx_arc, cancel).await;
468
469        // Clean up on EOF so the slot is released promptly.
470        if chunk.is_none() {
471            self.readers
472                .lock()
473                .unwrap_or_else(|e| e.into_inner())
474                .remove(handle_to_reader_key(handle));
475        }
476
477        Ok(chunk)
478    }
479
480    /// Push a chunk into a writer handle.
481    ///
482    /// Chunks larger than the configured `max_chunk_size` are split
483    /// automatically so individual messages stay within the backpressure budget.
484    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
485        // Clone the Sender (cheap) and release the lock before awaiting.
486        let (tx, timeout) = {
487            let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
488            let entry = reg
489                .get_mut(handle_to_writer_key(handle))
490                .ok_or_else(|| CoreError::invalid_handle(handle))?;
491            entry.touch();
492            (entry.value.tx.clone(), entry.value.drain_timeout)
493        };
494        let max = self.config.max_chunk_size;
495        if chunk.len() <= max {
496            tokio::time::timeout(timeout, tx.send(chunk))
497                .await
498                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
499                .map_err(|_| CoreError::internal("body reader dropped"))
500        } else {
501            // Split into max-size pieces.
502            let mut offset = 0;
503            while offset < chunk.len() {
504                let end = offset.saturating_add(max).min(chunk.len());
505                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
506                    .await
507                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
508                    .map_err(|_| CoreError::internal("body reader dropped"))?;
509                offset = end;
510            }
511            Ok(())
512        }
513    }
514
515    /// Signal end-of-body by dropping the writer from the registry.
516    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
517        self.writers
518            .lock()
519            .unwrap_or_else(|e| e.into_inner())
520            .remove(handle_to_writer_key(handle))
521            .ok_or_else(|| CoreError::invalid_handle(handle))?;
522        Ok(())
523    }
524
525    /// Drop a body reader, signalling cancellation of any in-flight read.
526    pub fn cancel_reader(&self, handle: u64) {
527        let entry = self
528            .readers
529            .lock()
530            .unwrap_or_else(|e| e.into_inner())
531            .remove(handle_to_reader_key(handle));
532        if let Some(e) = entry {
533            e.value.cancel.notify_waiters();
534        }
535    }
536
537    // ── Session ──────────────────────────────────────────────────────────
538
539    /// Insert a `SessionEntry` and return a handle.
540    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
541        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
542    }
543
544    /// Look up a session by handle without consuming it.
545    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
546        self.sessions
547            .lock()
548            .unwrap_or_else(|e| e.into_inner())
549            .get(handle_to_session_key(handle))
550            .map(|e| e.value.clone())
551    }
552
553    /// Remove a session entry by handle and return it.
554    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
555        self.sessions
556            .lock()
557            .unwrap_or_else(|e| e.into_inner())
558            .remove(handle_to_session_key(handle))
559            .map(|e| e.value)
560    }
561
562    // ── Request head (for server respond path) ───────────────────────────
563
564    /// Insert a response-head oneshot sender and return a handle.
565    pub fn allocate_req_handle(
566        &self,
567        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
568    ) -> Result<u64, CoreError> {
569        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
570    }
571
572    /// Remove and return the response-head sender for the given handle.
573    pub fn take_req_sender(
574        &self,
575        handle: u64,
576    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
577        self.request_heads
578            .lock()
579            .unwrap_or_else(|e| e.into_inner())
580            .remove(handle_to_request_head_key(handle))
581            .map(|e| e.value)
582    }
583
584    // ── Fetch cancel ─────────────────────────────────────────────────────
585
586    /// Allocate a cancellation token for an upcoming `fetch` call.
587    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
588        let notify = Arc::new(tokio::sync::Notify::new());
589        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
590    }
591
592    /// Signal an in-flight fetch to abort.
593    pub fn cancel_in_flight(&self, token: u64) {
594        if let Some(entry) = self
595            .fetch_cancels
596            .lock()
597            .unwrap_or_else(|e| e.into_inner())
598            .get(handle_to_fetch_cancel_key(token))
599        {
600            entry.value.notify_one();
601        }
602    }
603
604    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
605    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
606        self.fetch_cancels
607            .lock()
608            .unwrap_or_else(|e| e.into_inner())
609            .get(handle_to_fetch_cancel_key(token))
610            .map(|e| e.value.clone())
611    }
612
613    /// Remove a fetch cancel token after the fetch completes.
614    pub fn remove_fetch_token(&self, token: u64) {
615        self.fetch_cancels
616            .lock()
617            .unwrap_or_else(|e| e.into_inner())
618            .remove(handle_to_fetch_cancel_key(token));
619    }
620
621    // ── TTL sweep ────────────────────────────────────────────────────────
622
623    /// Sweep all registries, removing entries older than `ttl`.
624    /// Also compacts any registry that is empty after sweeping to reclaim
625    /// the backing memory from traffic bursts.
626    pub fn sweep(&self, ttl: Duration) {
627        Self::sweep_registry(&self.readers, ttl);
628        Self::sweep_registry(&self.writers, ttl);
629        Self::sweep_registry(&self.request_heads, ttl);
630        Self::sweep_registry(&self.sessions, ttl);
631        Self::sweep_registry(&self.fetch_cancels, ttl);
632        self.sweep_pending_readers(ttl);
633    }
634
635    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
636        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
637        let expired: Vec<K> = reg
638            .iter()
639            .filter(|(_, e)| e.is_expired(ttl))
640            .map(|(k, _)| k)
641            .collect();
642
643        if expired.is_empty() {
644            return;
645        }
646
647        for key in &expired {
648            reg.remove(*key);
649        }
650        tracing::debug!(
651            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
652            expired.len()
653        );
654        // Compact when empty to reclaim backing memory after traffic bursts.
655        if reg.is_empty() && reg.capacity() > 128 {
656            *reg = SlotMap::with_key();
657        }
658    }
659
660    fn sweep_pending_readers(&self, ttl: Duration) {
661        let mut map = self
662            .pending_readers
663            .lock()
664            .unwrap_or_else(|e| e.into_inner());
665        let before = map.len();
666        map.retain(|_, e| e.created.elapsed() < ttl);
667        let removed = before.saturating_sub(map.len());
668        if removed > 0 {
669            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
670        }
671    }
672}
673
674// ── Shared pump helpers ───────────────────────────────────────────────────────
675
676/// Default read buffer size for QUIC stream reads.
677pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
678
679/// Pump raw bytes from a QUIC `RecvStream` into a `BodyWriter`.
680///
681/// Reads `PUMP_READ_BUF`-sized chunks and forwards them through the body
682/// channel.  Stops when the stream ends or the writer is dropped.
683pub(crate) async fn pump_quic_recv_to_body(
684    mut recv: iroh::endpoint::RecvStream,
685    writer: BodyWriter,
686) {
687    while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
688        if writer.send_chunk(chunk.bytes).await.is_err() {
689            break;
690        }
691    }
692    // writer drops → BodyReader sees EOF.
693}
694
695/// Pump raw bytes from a `BodyReader` into a QUIC `SendStream`.
696///
697/// Reads chunks from the body channel and writes them to the stream.
698/// Finishes the stream when the reader reaches EOF.
699pub(crate) async fn pump_body_to_quic_send(
700    reader: BodyReader,
701    mut send: iroh::endpoint::SendStream,
702) {
703    loop {
704        match reader.next_chunk().await {
705            None => break,
706            Some(data) => {
707                if send.write_all(&data).await.is_err() {
708                    break;
709                }
710            }
711        }
712    }
713    let _ = send.finish();
714}
715
716/// Bidirectional pump between a byte-level I/O object and a pair of body channels.
717///
718/// Reads from `io` → sends to `writer` (incoming data).
719/// Reads from `reader` → writes to `io` (outgoing data).
720///
721/// Used for both client-side and server-side duplex upgrade pumps.
722pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
723where
724    IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
725{
726    let (mut recv, mut send) = tokio::io::split(io);
727
728    tokio::join!(
729        async {
730            use bytes::BytesMut;
731            use tokio::io::AsyncReadExt;
732            let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
733            loop {
734                buf.clear();
735                match recv.read_buf(&mut buf).await {
736                    Ok(0) | Err(_) => break,
737                    Ok(_) => {
738                        if writer.send_chunk(buf.split().freeze()).await.is_err() {
739                            break;
740                        }
741                    }
742                }
743            }
744        },
745        async {
746            use tokio::io::AsyncWriteExt;
747            loop {
748                match reader.next_chunk().await {
749                    None => break,
750                    Some(data) => {
751                        if send.write_all(&data).await.is_err() {
752                            break;
753                        }
754                    }
755                }
756            }
757            let _ = send.shutdown().await;
758        },
759    );
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765
766    fn test_store() -> HandleStore {
767        HandleStore::new(StoreConfig::default())
768    }
769
770    // ── Body channel basics ─────────────────────────────────────────────
771
772    #[tokio::test]
773    async fn body_channel_send_recv() {
774        let (writer, reader) = make_body_channel();
775        writer.send_chunk(Bytes::from("hello")).await.unwrap();
776        drop(writer); // signal EOF
777        let chunk = reader.next_chunk().await;
778        assert_eq!(chunk, Some(Bytes::from("hello")));
779        let eof = reader.next_chunk().await;
780        assert!(eof.is_none());
781    }
782
783    #[tokio::test]
784    async fn body_channel_multiple_chunks() {
785        let (writer, reader) = make_body_channel();
786        writer.send_chunk(Bytes::from("a")).await.unwrap();
787        writer.send_chunk(Bytes::from("b")).await.unwrap();
788        writer.send_chunk(Bytes::from("c")).await.unwrap();
789        drop(writer);
790
791        let mut collected = Vec::new();
792        while let Some(chunk) = reader.next_chunk().await {
793            collected.push(chunk);
794        }
795        assert_eq!(
796            collected,
797            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
798        );
799    }
800
801    #[tokio::test]
802    async fn body_channel_reader_dropped_returns_error() {
803        let (writer, reader) = make_body_channel();
804        drop(reader);
805        let result = writer.send_chunk(Bytes::from("data")).await;
806        assert!(result.is_err());
807    }
808
809    // ── HandleStore operations ──────────────────────────────────────────
810
811    #[tokio::test]
812    async fn insert_reader_and_next_chunk() {
813        let store = test_store();
814        let (writer, reader) = store.make_body_channel();
815        let handle = store.insert_reader(reader).unwrap();
816
817        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
818        drop(writer);
819
820        let chunk = store.next_chunk(handle).await.unwrap();
821        assert_eq!(chunk, Some(Bytes::from("slab-data")));
822
823        // EOF cleans up the registry entry
824        let eof = store.next_chunk(handle).await.unwrap();
825        assert!(eof.is_none());
826    }
827
828    #[tokio::test]
829    async fn next_chunk_invalid_handle() {
830        let store = test_store();
831        let result = store.next_chunk(999999).await;
832        assert!(result.is_err());
833        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
834    }
835
836    #[tokio::test]
837    async fn send_chunk_via_handle() {
838        let store = test_store();
839        let (writer, reader) = store.make_body_channel();
840        let handle = store.insert_writer(writer).unwrap();
841
842        store
843            .send_chunk(handle, Bytes::from("via-slab"))
844            .await
845            .unwrap();
846        store.finish_body(handle).unwrap();
847
848        let chunk = reader.next_chunk().await;
849        assert_eq!(chunk, Some(Bytes::from("via-slab")));
850        let eof = reader.next_chunk().await;
851        assert!(eof.is_none());
852    }
853
854    #[tokio::test]
855    async fn capacity_cap_rejects_overflow() {
856        let store = HandleStore::new(StoreConfig {
857            max_handles: 2,
858            ..StoreConfig::default()
859        });
860        let (_, r1) = store.make_body_channel();
861        let (_, r2) = store.make_body_channel();
862        let (_, r3) = store.make_body_channel();
863        store.insert_reader(r1).unwrap();
864        store.insert_reader(r2).unwrap();
865        let err = store.insert_reader(r3).unwrap_err();
866        assert!(err.message.contains("capacity"));
867    }
868
869    // ── #84 regression: recv_with_cancel cancellation ──────────────────
870
871    #[tokio::test]
872    async fn recv_with_cancel_returns_none_on_cancel() {
873        let (_tx, rx) = mpsc::channel::<Bytes>(4);
874        let rx = Arc::new(tokio::sync::Mutex::new(rx));
875        let cancel = Arc::new(tokio::sync::Notify::new());
876        // Notify before polling — biased select must return None immediately.
877        cancel.notify_one();
878        let result = recv_with_cancel(rx, cancel).await;
879        assert!(result.is_none());
880    }
881}