Skip to main content

iroh_http_core/
stream.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7
8use std::{
9    collections::HashMap,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
24pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
25pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
26pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; // 60 s
27pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29// ── Resource types ────────────────────────────────────────────────────────────
30
31pub struct SessionEntry {
32    pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36    pub status: u16,
37    pub headers: Vec<(String, String)>,
38}
39
40// ── SlotMap key types ─────────────────────────────────────────────────────────
41
42slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48// ── Handle encode / decode helpers ───────────────────────────────────────────
49
50fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51    k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55    ($fn_name:ident, $key_type:ty) => {
56        fn $fn_name(h: u64) -> $key_type {
57            <$key_type>::from(KeyData::from_ffi(h))
58        }
59    };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68// ── Body channel primitives ───────────────────────────────────────────────────
69
70/// Consumer end — stored in the reader registry.
71/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
72/// the registry's `std::sync::Mutex`.
73pub struct BodyReader {
74    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
76    /// so in-flight `next_chunk` awaits terminate promptly.
77    pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80/// Producer end — stored in the writer registry.
81/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
82pub struct BodyWriter {
83    pub(crate) tx: mpsc::Sender<Bytes>,
84    /// Drain timeout baked in at channel-creation time from the endpoint config.
85    pub(crate) drain_timeout: Duration,
86}
87
88/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
89///
90/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
91/// the channel inherits the endpoint's backpressure config.  This free
92/// function uses the compile-time defaults and exists for tests and pre-bind
93/// code paths.
94pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95    make_body_channel_with(
96        DEFAULT_CHANNEL_CAPACITY,
97        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98    )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102    let (tx, rx) = mpsc::channel(capacity);
103    (
104        BodyWriter { tx, drain_timeout },
105        BodyReader {
106            rx: Arc::new(tokio::sync::Mutex::new(rx)),
107            cancel: Arc::new(tokio::sync::Notify::new()),
108        },
109    )
110}
111
112// ── Cancellable receive ───────────────────────────────────────────────────────
113
114/// Receive the next chunk from a body channel, aborting immediately if
115/// `cancel` is notified.
116///
117/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
118/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
119/// this helper so the cancellation semantics are defined and tested in one place.
120async fn recv_with_cancel(
121    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122    cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124    tokio::select! {
125        biased;
126        _ = cancel.notified() => None,
127        chunk = async { rx.lock().await.recv().await } => chunk,
128    }
129}
130
131impl BodyReader {
132    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
133    /// or when the reader has been cancelled.
134    pub async fn next_chunk(&self) -> Option<Bytes> {
135        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136    }
137}
138
139impl BodyWriter {
140    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
141    /// the drain timeout expires (JS not reading fast enough).
142    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144            .await
145            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146            .map_err(|_| "body reader dropped".to_string())
147    }
148}
149
150// ── StoreConfig ───────────────────────────────────────────────────────────────
151
152/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
153#[derive(Debug, Clone)]
154pub struct StoreConfig {
155    /// Body-channel capacity (in chunks).  Minimum 1.
156    pub channel_capacity: usize,
157    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
158    pub max_chunk_size: usize,
159    /// Milliseconds to wait for a slow body reader before dropping.
160    pub drain_timeout: Duration,
161    /// Maximum handle slots per registry.  Prevents unbounded growth.
162    pub max_handles: usize,
163    /// TTL for handle entries; expired entries are swept periodically.
164    /// Zero disables sweeping.
165    pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169    fn default() -> Self {
170        Self {
171            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174            max_handles: DEFAULT_MAX_HANDLES,
175            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176        }
177    }
178}
179
180// ── Timed wrapper ─────────────────────────────────────────────────────────────
181
182struct Timed<T> {
183    value: T,
184    /// Updated on every access so that actively-used handles are not TTL-swept
185    /// mid-transfer (fix for iroh-http#119 Bug 3).
186    last_accessed: Instant,
187}
188
189impl<T> Timed<T> {
190    fn new(value: T) -> Self {
191        Self {
192            value,
193            last_accessed: Instant::now(),
194        }
195    }
196
197    /// Refresh the last-access timestamp.  Call inside the registry lock.
198    fn touch(&mut self) {
199        self.last_accessed = Instant::now();
200    }
201
202    fn is_expired(&self, ttl: Duration) -> bool {
203        self.last_accessed.elapsed() > ttl
204    }
205}
206
207/// Pending reader tracked with insertion time for TTL sweep.
208struct PendingReaderEntry {
209    reader: BodyReader,
210    created: Instant,
211}
212
213// ── HandleStore ───────────────────────────────────────────────────────────────
214
215/// Tracks handles inserted during a multi-handle allocation sequence.
216/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
217/// has been called. This prevents orphaned handles when a later insert fails.
218pub(crate) struct InsertGuard<'a> {
219    store: &'a HandleStore,
220    tracked: Vec<TrackedHandle>,
221    committed: bool,
222}
223
224/// A handle tracked by [`InsertGuard`] for rollback on drop.
225///
226/// # Intentionally omitted variants
227///
228/// `Session` and `FetchCancel` are not tracked here because their lifecycles
229/// are managed outside of multi-handle allocation sequences:
230/// - Sessions are created and closed by `session_connect` / `session_close`
231///   independently and are never allocated inside a guard.
232/// - Fetch cancel tokens are allocated before a guard is opened and are
233///   always cleaned up by `remove_fetch_token` after the fetch resolves.
234///
235/// If either type is ever added to a guard-guarded allocation path in the
236/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
237/// corresponding rollback arms in [`InsertGuard::drop`].
238enum TrackedHandle {
239    Reader(u64),
240    Writer(u64),
241    ReqHead(u64),
242}
243
244impl<'a> InsertGuard<'a> {
245    fn new(store: &'a HandleStore) -> Self {
246        Self {
247            store,
248            tracked: Vec::new(),
249            committed: false,
250        }
251    }
252
253    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
254        let h = self.store.insert_reader(reader)?;
255        self.tracked.push(TrackedHandle::Reader(h));
256        Ok(h)
257    }
258
259    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
260        let h = self.store.insert_writer(writer)?;
261        self.tracked.push(TrackedHandle::Writer(h));
262        Ok(h)
263    }
264
265    pub fn allocate_req_handle(
266        &mut self,
267        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
268    ) -> Result<u64, CoreError> {
269        let h = self.store.allocate_req_handle(sender)?;
270        self.tracked.push(TrackedHandle::ReqHead(h));
271        Ok(h)
272    }
273
274    /// Consume the guard without rolling back. Call after all inserts succeed.
275    pub fn commit(mut self) {
276        self.committed = true;
277    }
278}
279
280impl Drop for InsertGuard<'_> {
281    fn drop(&mut self) {
282        if self.committed {
283            return;
284        }
285        for handle in &self.tracked {
286            match handle {
287                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
288                TrackedHandle::Writer(h) => {
289                    let _ = self.store.finish_body(*h);
290                }
291                TrackedHandle::ReqHead(h) => {
292                    self.store
293                        .request_heads
294                        .lock()
295                        .unwrap_or_else(|e| e.into_inner())
296                        .remove(handle_to_request_head_key(*h));
297                }
298            }
299        }
300    }
301}
302
303/// Per-endpoint handle registry.  Owns all body readers, writers,
304/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
305/// a single `IrohEndpoint`.
306///
307/// When the endpoint is dropped, this store is dropped with it — all
308/// slot-maps are freed and any remaining handles become invalid.
309pub struct HandleStore {
310    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
311    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
312    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
313    request_heads:
314        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
315    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
316    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
317    pub(crate) config: StoreConfig,
318}
319
320impl HandleStore {
321    /// Create a new handle store with the given configuration.
322    pub fn new(config: StoreConfig) -> Self {
323        Self {
324            readers: Mutex::new(SlotMap::with_key()),
325            writers: Mutex::new(SlotMap::with_key()),
326            sessions: Mutex::new(SlotMap::with_key()),
327            request_heads: Mutex::new(SlotMap::with_key()),
328            fetch_cancels: Mutex::new(SlotMap::with_key()),
329            pending_readers: Mutex::new(HashMap::new()),
330            config,
331        }
332    }
333
334    // ── Config accessors ─────────────────────────────────────────────────
335
336    /// Create a guard for multi-handle allocation with automatic rollback.
337    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
338        InsertGuard::new(self)
339    }
340
341    /// The configured drain timeout.
342    pub fn drain_timeout(&self) -> Duration {
343        self.config.drain_timeout
344    }
345
346    /// The configured maximum chunk size.
347    pub fn max_chunk_size(&self) -> usize {
348        self.config.max_chunk_size
349    }
350
351    /// Snapshot of handle counts for observability.
352    ///
353    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
354    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
355        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
356        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
357        let sessions = self
358            .sessions
359            .lock()
360            .unwrap_or_else(|e| e.into_inner())
361            .len();
362        let total = readers
363            .saturating_add(writers)
364            .saturating_add(sessions)
365            .saturating_add(
366                self.request_heads
367                    .lock()
368                    .unwrap_or_else(|e| e.into_inner())
369                    .len(),
370            )
371            .saturating_add(
372                self.fetch_cancels
373                    .lock()
374                    .unwrap_or_else(|e| e.into_inner())
375                    .len(),
376            );
377        (readers, writers, sessions, total)
378    }
379
380    // ── Body channels ────────────────────────────────────────────────────
381
382    /// Create a matched (writer, reader) pair using this store's config.
383    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
384        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
385    }
386
387    // ── Capacity-checked insert ──────────────────────────────────────────
388
389    fn insert_checked<K: slotmap::Key, T>(
390        registry: &Mutex<SlotMap<K, Timed<T>>>,
391        value: T,
392        max: usize,
393    ) -> Result<u64, CoreError> {
394        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
395        if reg.len() >= max {
396            return Err(CoreError::internal("handle registry at capacity"));
397        }
398        let key = reg.insert(Timed::new(value));
399        Ok(key_to_handle(key))
400    }
401
402    // ── Body reader / writer ─────────────────────────────────────────────
403
404    /// Insert a `BodyReader` and return a handle.
405    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
406        Self::insert_checked(&self.readers, reader, self.config.max_handles)
407    }
408
409    /// Insert a `BodyWriter` and return a handle.
410    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
411        Self::insert_checked(&self.writers, writer, self.config.max_handles)
412    }
413
414    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
415    ///
416    /// The writer handle is returned to JS.  The reader must be stashed via
417    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
418    /// can claim it.
419    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
420        let (writer, reader) = self.make_body_channel();
421        let handle = self.insert_writer(writer)?;
422        Ok((handle, reader))
423    }
424
425    /// Store the reader side of a newly allocated writer channel so that the
426    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
427    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
428        self.pending_readers
429            .lock()
430            .unwrap_or_else(|e| e.into_inner())
431            .insert(
432                writer_handle,
433                PendingReaderEntry {
434                    reader,
435                    created: Instant::now(),
436                },
437            );
438    }
439
440    /// Claim the reader that was paired with `writer_handle`.
441    /// Returns `None` if already claimed or never stored.
442    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
443        self.pending_readers
444            .lock()
445            .unwrap_or_else(|e| e.into_inner())
446            .remove(&writer_handle)
447            .map(|e| e.reader)
448    }
449
450    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
451
452    /// Pull the next chunk from a reader handle.
453    ///
454    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
455    /// cleaned up from the registry automatically.
456    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
457        // Clone the Arc — allows awaiting without holding the registry mutex.
458        let (rx_arc, cancel) = {
459            let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
460            let entry = reg
461                .get_mut(handle_to_reader_key(handle))
462                .ok_or_else(|| CoreError::invalid_handle(handle))?;
463            entry.touch();
464            (entry.value.rx.clone(), entry.value.cancel.clone())
465        };
466
467        let chunk = recv_with_cancel(rx_arc, cancel).await;
468
469        // Clean up on EOF so the slot is released promptly.
470        if chunk.is_none() {
471            self.readers
472                .lock()
473                .unwrap_or_else(|e| e.into_inner())
474                .remove(handle_to_reader_key(handle));
475        }
476
477        Ok(chunk)
478    }
479
480    /// Non-blocking variant of [`next_chunk`](Self::next_chunk).
481    ///
482    /// Returns:
483    /// - `Ok(Some(bytes))` — a chunk was immediately available.
484    /// - `Ok(None)` — EOF; the reader is cleaned up.
485    /// - `Err(_)` — no data available yet (channel empty or lock contended),
486    ///   or invalid handle. Caller should retry after yielding.
487    ///
488    /// #126: Used by the Deno adapter to avoid `spawn_blocking` overhead on
489    /// the body-read hot path.  When data is already buffered in the channel,
490    /// this returns it synchronously on the JS thread.
491    pub fn try_next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
492        let rx_arc = {
493            let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
494            let entry = reg
495                .get_mut(handle_to_reader_key(handle))
496                .ok_or_else(|| CoreError::invalid_handle(handle))?;
497            entry.touch();
498            entry.value.rx.clone()
499        };
500
501        // Try to acquire the tokio mutex without blocking.
502        let mut rx_guard = match rx_arc.try_lock() {
503            Ok(g) => g,
504            Err(_) => return Err(CoreError::internal("try_next_chunk: lock contended")),
505        };
506
507        match rx_guard.try_recv() {
508            Ok(chunk) => Ok(Some(chunk)),
509            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
510                Err(CoreError::internal("try_next_chunk: channel empty"))
511            }
512            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
513                // EOF — clean up the reader.
514                drop(rx_guard);
515                self.readers
516                    .lock()
517                    .unwrap_or_else(|e| e.into_inner())
518                    .remove(handle_to_reader_key(handle));
519                Ok(None)
520            }
521        }
522    }
523
524    /// Push a chunk into a writer handle.
525    ///
526    /// Chunks larger than the configured `max_chunk_size` are split
527    /// automatically so individual messages stay within the backpressure budget.
528    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
529        // Clone the Sender (cheap) and release the lock before awaiting.
530        let (tx, timeout) = {
531            let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
532            let entry = reg
533                .get_mut(handle_to_writer_key(handle))
534                .ok_or_else(|| CoreError::invalid_handle(handle))?;
535            entry.touch();
536            (entry.value.tx.clone(), entry.value.drain_timeout)
537        };
538        let max = self.config.max_chunk_size;
539        if chunk.len() <= max {
540            tokio::time::timeout(timeout, tx.send(chunk))
541                .await
542                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
543                .map_err(|_| CoreError::internal("body reader dropped"))
544        } else {
545            // Split into max-size pieces.
546            let mut offset = 0;
547            while offset < chunk.len() {
548                let end = offset.saturating_add(max).min(chunk.len());
549                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
550                    .await
551                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
552                    .map_err(|_| CoreError::internal("body reader dropped"))?;
553                offset = end;
554            }
555            Ok(())
556        }
557    }
558
559    /// Signal end-of-body by dropping the writer from the registry.
560    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
561        self.writers
562            .lock()
563            .unwrap_or_else(|e| e.into_inner())
564            .remove(handle_to_writer_key(handle))
565            .ok_or_else(|| CoreError::invalid_handle(handle))?;
566        Ok(())
567    }
568
569    /// Drop a body reader, signalling cancellation of any in-flight read.
570    pub fn cancel_reader(&self, handle: u64) {
571        let entry = self
572            .readers
573            .lock()
574            .unwrap_or_else(|e| e.into_inner())
575            .remove(handle_to_reader_key(handle));
576        if let Some(e) = entry {
577            e.value.cancel.notify_waiters();
578        }
579    }
580
581    // ── Session ──────────────────────────────────────────────────────────
582
583    /// Insert a `SessionEntry` and return a handle.
584    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
585        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
586    }
587
588    /// Look up a session by handle without consuming it.
589    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
590        self.sessions
591            .lock()
592            .unwrap_or_else(|e| e.into_inner())
593            .get(handle_to_session_key(handle))
594            .map(|e| e.value.clone())
595    }
596
597    /// Remove a session entry by handle and return it.
598    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
599        self.sessions
600            .lock()
601            .unwrap_or_else(|e| e.into_inner())
602            .remove(handle_to_session_key(handle))
603            .map(|e| e.value)
604    }
605
606    // ── Request head (for server respond path) ───────────────────────────
607
608    /// Insert a response-head oneshot sender and return a handle.
609    pub fn allocate_req_handle(
610        &self,
611        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
612    ) -> Result<u64, CoreError> {
613        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
614    }
615
616    /// Remove and return the response-head sender for the given handle.
617    pub fn take_req_sender(
618        &self,
619        handle: u64,
620    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
621        self.request_heads
622            .lock()
623            .unwrap_or_else(|e| e.into_inner())
624            .remove(handle_to_request_head_key(handle))
625            .map(|e| e.value)
626    }
627
628    // ── Fetch cancel ─────────────────────────────────────────────────────
629
630    /// Allocate a cancellation token for an upcoming `fetch` call.
631    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
632        let notify = Arc::new(tokio::sync::Notify::new());
633        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
634    }
635
636    /// Signal an in-flight fetch to abort.
637    pub fn cancel_in_flight(&self, token: u64) {
638        if let Some(entry) = self
639            .fetch_cancels
640            .lock()
641            .unwrap_or_else(|e| e.into_inner())
642            .get(handle_to_fetch_cancel_key(token))
643        {
644            entry.value.notify_one();
645        }
646    }
647
648    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
649    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
650        self.fetch_cancels
651            .lock()
652            .unwrap_or_else(|e| e.into_inner())
653            .get(handle_to_fetch_cancel_key(token))
654            .map(|e| e.value.clone())
655    }
656
657    /// Remove a fetch cancel token after the fetch completes.
658    pub fn remove_fetch_token(&self, token: u64) {
659        self.fetch_cancels
660            .lock()
661            .unwrap_or_else(|e| e.into_inner())
662            .remove(handle_to_fetch_cancel_key(token));
663    }
664
665    // ── TTL sweep ────────────────────────────────────────────────────────
666
667    /// Sweep all registries, removing entries older than `ttl`.
668    /// Also compacts any registry that is empty after sweeping to reclaim
669    /// the backing memory from traffic bursts.
670    pub fn sweep(&self, ttl: Duration) {
671        Self::sweep_readers(&self.readers, ttl);
672        Self::sweep_registry(&self.writers, ttl);
673        Self::sweep_registry(&self.request_heads, ttl);
674        Self::sweep_registry(&self.sessions, ttl);
675        Self::sweep_registry(&self.fetch_cancels, ttl);
676        self.sweep_pending_readers(ttl);
677    }
678
679    /// Sweep expired readers, firing the cancel signal so any in-flight
680    /// `next_chunk` awaits terminate promptly instead of hanging.
681    fn sweep_readers(registry: &Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>, ttl: Duration) {
682        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
683        let expired: Vec<ReaderKey> = reg
684            .iter()
685            .filter(|(_, e)| e.is_expired(ttl))
686            .map(|(k, _)| k)
687            .collect();
688
689        if expired.is_empty() {
690            return;
691        }
692
693        for key in &expired {
694            if let Some(entry) = reg.remove(*key) {
695                entry.value.cancel.notify_waiters();
696            }
697        }
698        tracing::debug!(
699            "[iroh-http] swept {} expired reader entries (ttl={ttl:?})",
700            expired.len()
701        );
702        if reg.is_empty() && reg.capacity() > 128 {
703            *reg = SlotMap::with_key();
704        }
705    }
706
707    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
708        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
709        let expired: Vec<K> = reg
710            .iter()
711            .filter(|(_, e)| e.is_expired(ttl))
712            .map(|(k, _)| k)
713            .collect();
714
715        if expired.is_empty() {
716            return;
717        }
718
719        for key in &expired {
720            reg.remove(*key);
721        }
722        tracing::debug!(
723            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
724            expired.len()
725        );
726        // Compact when empty to reclaim backing memory after traffic bursts.
727        if reg.is_empty() && reg.capacity() > 128 {
728            *reg = SlotMap::with_key();
729        }
730    }
731
732    fn sweep_pending_readers(&self, ttl: Duration) {
733        let mut map = self
734            .pending_readers
735            .lock()
736            .unwrap_or_else(|e| e.into_inner());
737        let before = map.len();
738        map.retain(|_, e| e.created.elapsed() < ttl);
739        let removed = before.saturating_sub(map.len());
740        if removed > 0 {
741            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
742        }
743    }
744}
745
746// ── Shared pump helpers ───────────────────────────────────────────────────────
747
748/// Default read buffer size for QUIC stream reads.
749pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
750
751/// Pump raw bytes from a QUIC `RecvStream` into a `BodyWriter`.
752///
753/// Reads `PUMP_READ_BUF`-sized chunks and forwards them through the body
754/// channel.  Stops when the stream ends or the writer is dropped.
755pub(crate) async fn pump_quic_recv_to_body(
756    mut recv: iroh::endpoint::RecvStream,
757    writer: BodyWriter,
758) {
759    while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
760        if writer.send_chunk(chunk.bytes).await.is_err() {
761            break;
762        }
763    }
764    // writer drops → BodyReader sees EOF.
765}
766
767/// Pump raw bytes from a `BodyReader` into a QUIC `SendStream`.
768///
769/// Reads chunks from the body channel and writes them to the stream.
770/// Finishes the stream when the reader reaches EOF.
771pub(crate) async fn pump_body_to_quic_send(
772    reader: BodyReader,
773    mut send: iroh::endpoint::SendStream,
774) {
775    loop {
776        match reader.next_chunk().await {
777            None => break,
778            Some(data) => {
779                if send.write_all(&data).await.is_err() {
780                    break;
781                }
782            }
783        }
784    }
785    let _ = send.finish();
786}
787
788/// Bidirectional pump between a byte-level I/O object and a pair of body channels.
789///
790/// Reads from `io` → sends to `writer` (incoming data).
791/// Reads from `reader` → writes to `io` (outgoing data).
792///
793/// Used for both client-side and server-side duplex upgrade pumps.
794pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
795where
796    IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
797{
798    let (mut recv, mut send) = tokio::io::split(io);
799
800    tokio::join!(
801        async {
802            use bytes::BytesMut;
803            use tokio::io::AsyncReadExt;
804            let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
805            loop {
806                buf.clear();
807                match recv.read_buf(&mut buf).await {
808                    Ok(0) | Err(_) => break,
809                    Ok(_) => {
810                        if writer.send_chunk(buf.split().freeze()).await.is_err() {
811                            break;
812                        }
813                    }
814                }
815            }
816        },
817        async {
818            use tokio::io::AsyncWriteExt;
819            loop {
820                match reader.next_chunk().await {
821                    None => break,
822                    Some(data) => {
823                        if send.write_all(&data).await.is_err() {
824                            break;
825                        }
826                    }
827                }
828            }
829            let _ = send.shutdown().await;
830        },
831    );
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837
838    fn test_store() -> HandleStore {
839        HandleStore::new(StoreConfig::default())
840    }
841
842    // ── Body channel basics ─────────────────────────────────────────────
843
844    #[tokio::test]
845    async fn body_channel_send_recv() {
846        let (writer, reader) = make_body_channel();
847        writer.send_chunk(Bytes::from("hello")).await.unwrap();
848        drop(writer); // signal EOF
849        let chunk = reader.next_chunk().await;
850        assert_eq!(chunk, Some(Bytes::from("hello")));
851        let eof = reader.next_chunk().await;
852        assert!(eof.is_none());
853    }
854
855    #[tokio::test]
856    async fn body_channel_multiple_chunks() {
857        let (writer, reader) = make_body_channel();
858        writer.send_chunk(Bytes::from("a")).await.unwrap();
859        writer.send_chunk(Bytes::from("b")).await.unwrap();
860        writer.send_chunk(Bytes::from("c")).await.unwrap();
861        drop(writer);
862
863        let mut collected = Vec::new();
864        while let Some(chunk) = reader.next_chunk().await {
865            collected.push(chunk);
866        }
867        assert_eq!(
868            collected,
869            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
870        );
871    }
872
873    #[tokio::test]
874    async fn body_channel_reader_dropped_returns_error() {
875        let (writer, reader) = make_body_channel();
876        drop(reader);
877        let result = writer.send_chunk(Bytes::from("data")).await;
878        assert!(result.is_err());
879    }
880
881    // ── HandleStore operations ──────────────────────────────────────────
882
883    #[tokio::test]
884    async fn insert_reader_and_next_chunk() {
885        let store = test_store();
886        let (writer, reader) = store.make_body_channel();
887        let handle = store.insert_reader(reader).unwrap();
888
889        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
890        drop(writer);
891
892        let chunk = store.next_chunk(handle).await.unwrap();
893        assert_eq!(chunk, Some(Bytes::from("slab-data")));
894
895        // EOF cleans up the registry entry
896        let eof = store.next_chunk(handle).await.unwrap();
897        assert!(eof.is_none());
898    }
899
900    #[tokio::test]
901    async fn next_chunk_invalid_handle() {
902        let store = test_store();
903        let result = store.next_chunk(999999).await;
904        assert!(result.is_err());
905        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
906    }
907
908    #[tokio::test]
909    async fn send_chunk_via_handle() {
910        let store = test_store();
911        let (writer, reader) = store.make_body_channel();
912        let handle = store.insert_writer(writer).unwrap();
913
914        store
915            .send_chunk(handle, Bytes::from("via-slab"))
916            .await
917            .unwrap();
918        store.finish_body(handle).unwrap();
919
920        let chunk = reader.next_chunk().await;
921        assert_eq!(chunk, Some(Bytes::from("via-slab")));
922        let eof = reader.next_chunk().await;
923        assert!(eof.is_none());
924    }
925
926    #[tokio::test]
927    async fn capacity_cap_rejects_overflow() {
928        let store = HandleStore::new(StoreConfig {
929            max_handles: 2,
930            ..StoreConfig::default()
931        });
932        let (_, r1) = store.make_body_channel();
933        let (_, r2) = store.make_body_channel();
934        let (_, r3) = store.make_body_channel();
935        store.insert_reader(r1).unwrap();
936        store.insert_reader(r2).unwrap();
937        let err = store.insert_reader(r3).unwrap_err();
938        assert!(err.message.contains("capacity"));
939    }
940
941    // ── #84 regression: recv_with_cancel cancellation ──────────────────
942
943    #[tokio::test]
944    async fn recv_with_cancel_returns_none_on_cancel() {
945        let (_tx, rx) = mpsc::channel::<Bytes>(4);
946        let rx = Arc::new(tokio::sync::Mutex::new(rx));
947        let cancel = Arc::new(tokio::sync::Notify::new());
948        // Notify before polling — biased select must return None immediately.
949        cancel.notify_one();
950        let result = recv_with_cancel(rx, cancel).await;
951        assert!(result.is_none());
952    }
953}