Skip to main content

iroh_http_core/
stream.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7
8use std::{
9    collections::HashMap,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
24pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
25pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
26pub const DEFAULT_MAX_HANDLES: usize = 65_536;
27
28// ── Resource types ────────────────────────────────────────────────────────────
29
30pub struct SessionEntry {
31    pub conn: iroh::endpoint::Connection,
32}
33
34pub struct ResponseHeadEntry {
35    pub status: u16,
36    pub headers: Vec<(String, String)>,
37}
38
39// ── SlotMap key types ─────────────────────────────────────────────────────────
40
41slotmap::new_key_type! { pub(crate) struct ReaderKey; }
42slotmap::new_key_type! { pub(crate) struct WriterKey; }
43slotmap::new_key_type! { pub(crate) struct TrailerTxKey; }
44slotmap::new_key_type! { pub(crate) struct TrailerRxKey; }
45slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
46slotmap::new_key_type! { pub(crate) struct SessionKey; }
47slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
48
49// ── Handle encode / decode helpers ───────────────────────────────────────────
50
51fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
52    k.data().as_ffi()
53}
54
55macro_rules! handle_to_key {
56    ($fn_name:ident, $key_type:ty) => {
57        fn $fn_name(h: u64) -> $key_type {
58            <$key_type>::from(KeyData::from_ffi(h))
59        }
60    };
61}
62
63handle_to_key!(handle_to_reader_key, ReaderKey);
64handle_to_key!(handle_to_writer_key, WriterKey);
65handle_to_key!(handle_to_trailer_tx_key, TrailerTxKey);
66handle_to_key!(handle_to_trailer_rx_key, TrailerRxKey);
67handle_to_key!(handle_to_session_key, SessionKey);
68handle_to_key!(handle_to_request_head_key, RequestHeadKey);
69handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
70
71// ── Body channel primitives ───────────────────────────────────────────────────
72
73/// Consumer end — stored in the reader registry.
74/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
75/// the registry's `std::sync::Mutex`.
76pub struct BodyReader {
77    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
78    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
79    /// so in-flight `next_chunk` awaits terminate promptly.
80    pub(crate) cancel: Arc<tokio::sync::Notify>,
81    /// Drain timeout inherited from the endpoint config at channel-creation time.
82    pub(crate) drain_timeout: Duration,
83}
84
85/// Producer end — stored in the writer registry.
86/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
87pub struct BodyWriter {
88    pub(crate) tx: mpsc::Sender<Bytes>,
89    /// Drain timeout baked in at channel-creation time from the endpoint config.
90    pub(crate) drain_timeout: Duration,
91}
92
93/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
94///
95/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
96/// the channel inherits the endpoint's backpressure config.  This free
97/// function uses the compile-time defaults and exists for tests and pre-bind
98/// code paths.
99pub fn make_body_channel() -> (BodyWriter, BodyReader) {
100    make_body_channel_with(
101        DEFAULT_CHANNEL_CAPACITY,
102        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
103    )
104}
105
106fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
107    let (tx, rx) = mpsc::channel(capacity);
108    (
109        BodyWriter { tx, drain_timeout },
110        BodyReader {
111            rx: Arc::new(tokio::sync::Mutex::new(rx)),
112            cancel: Arc::new(tokio::sync::Notify::new()),
113            drain_timeout,
114        },
115    )
116}
117
118// ── Cancellable receive ───────────────────────────────────────────────────────
119
120/// Receive the next chunk from a body channel, aborting immediately if
121/// `cancel` is notified.
122///
123/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
124/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
125/// this helper so the cancellation semantics are defined and tested in one place.
126async fn recv_with_cancel(
127    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
128    cancel: Arc<tokio::sync::Notify>,
129) -> Option<Bytes> {
130    tokio::select! {
131        biased;
132        _ = cancel.notified() => None,
133        chunk = async { rx.lock().await.recv().await } => chunk,
134    }
135}
136
137impl BodyReader {
138    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
139    /// or when the reader has been cancelled.
140    pub async fn next_chunk(&self) -> Option<Bytes> {
141        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
142    }
143}
144
145impl BodyWriter {
146    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
147    /// the drain timeout expires (JS not reading fast enough).
148    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
149        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
150            .await
151            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
152            .map_err(|_| "body reader dropped".to_string())
153    }
154}
155
156// ── Trailer type aliases ──────────────────────────────────────────────────────
157
158type TrailerTx = tokio::sync::oneshot::Sender<Vec<(String, String)>>;
159pub(crate) type TrailerRx = tokio::sync::oneshot::Receiver<Vec<(String, String)>>;
160
161// ── StoreConfig ───────────────────────────────────────────────────────────────
162
163/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
164#[derive(Debug, Clone)]
165pub struct StoreConfig {
166    /// Body-channel capacity (in chunks).  Minimum 1.
167    pub channel_capacity: usize,
168    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
169    pub max_chunk_size: usize,
170    /// Milliseconds to wait for a slow body reader before dropping.
171    pub drain_timeout: Duration,
172    /// Maximum handle slots per registry.  Prevents unbounded growth.
173    pub max_handles: usize,
174    /// TTL for handle entries; expired entries are swept periodically.
175    /// Zero disables sweeping.
176    pub ttl: Duration,
177}
178
179impl Default for StoreConfig {
180    fn default() -> Self {
181        Self {
182            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
183            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
184            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
185            max_handles: DEFAULT_MAX_HANDLES,
186            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
187        }
188    }
189}
190
191// ── Timed wrapper ─────────────────────────────────────────────────────────────
192
193struct Timed<T> {
194    value: T,
195    created_at: Instant,
196}
197
198impl<T> Timed<T> {
199    fn new(value: T) -> Self {
200        Self {
201            value,
202            created_at: Instant::now(),
203        }
204    }
205
206    fn is_expired(&self, ttl: Duration) -> bool {
207        self.created_at.elapsed() > ttl
208    }
209}
210
211/// Pending reader tracked with insertion time for TTL sweep.
212struct PendingReaderEntry {
213    reader: BodyReader,
214    created: Instant,
215}
216
217/// Pending trailer receiver tracked with insertion time for TTL sweep.
218struct PendingTrailerRxEntry {
219    rx: TrailerRx,
220    created: Instant,
221}
222
223// ── HandleStore ───────────────────────────────────────────────────────────────
224
225/// Tracks handles inserted during a multi-handle allocation sequence.
226/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
227/// has been called. This prevents orphaned handles when a later insert fails.
228pub(crate) struct InsertGuard<'a> {
229    store: &'a HandleStore,
230    tracked: Vec<TrackedHandle>,
231    committed: bool,
232}
233
234/// A handle tracked by [`InsertGuard`] for rollback on drop.
235///
236/// # Intentionally omitted variants
237///
238/// `Session` and `FetchCancel` are not tracked here because their lifecycles
239/// are managed outside of multi-handle allocation sequences:
240/// - Sessions are created and closed by `session_connect` / `session_close`
241///   independently and are never allocated inside a guard.
242/// - Fetch cancel tokens are allocated before a guard is opened and are
243///   always cleaned up by `remove_fetch_token` after the fetch resolves.
244///
245/// If either type is ever added to a guard-guarded allocation path in the
246/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
247/// corresponding rollback arms in [`InsertGuard::drop`].
248enum TrackedHandle {
249    Reader(u64),
250    Writer(u64),
251    TrailerTx(u64),
252    TrailerRx(u64),
253    ReqHead(u64),
254}
255
256impl<'a> InsertGuard<'a> {
257    fn new(store: &'a HandleStore) -> Self {
258        Self {
259            store,
260            tracked: Vec::new(),
261            committed: false,
262        }
263    }
264
265    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
266        let h = self.store.insert_reader(reader)?;
267        self.tracked.push(TrackedHandle::Reader(h));
268        Ok(h)
269    }
270
271    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
272        let h = self.store.insert_writer(writer)?;
273        self.tracked.push(TrackedHandle::Writer(h));
274        Ok(h)
275    }
276
277    pub fn insert_trailer_sender(&mut self, tx: TrailerTx) -> Result<u64, CoreError> {
278        let h = self.store.insert_trailer_sender(tx)?;
279        self.tracked.push(TrackedHandle::TrailerTx(h));
280        Ok(h)
281    }
282
283    pub fn insert_trailer_receiver(&mut self, rx: TrailerRx) -> Result<u64, CoreError> {
284        let h = self.store.insert_trailer_receiver(rx)?;
285        self.tracked.push(TrackedHandle::TrailerRx(h));
286        Ok(h)
287    }
288
289    pub fn allocate_req_handle(
290        &mut self,
291        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
292    ) -> Result<u64, CoreError> {
293        let h = self.store.allocate_req_handle(sender)?;
294        self.tracked.push(TrackedHandle::ReqHead(h));
295        Ok(h)
296    }
297
298    /// Consume the guard without rolling back. Call after all inserts succeed.
299    pub fn commit(mut self) {
300        self.committed = true;
301    }
302}
303
304impl Drop for InsertGuard<'_> {
305    fn drop(&mut self) {
306        if self.committed {
307            return;
308        }
309        for handle in &self.tracked {
310            match handle {
311                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
312                TrackedHandle::Writer(h) => {
313                    let _ = self.store.finish_body(*h);
314                }
315                TrackedHandle::TrailerTx(h) => self.store.remove_trailer_sender(*h),
316                TrackedHandle::TrailerRx(h) => {
317                    self.store
318                        .trailer_rx
319                        .lock()
320                        .unwrap_or_else(|e| e.into_inner())
321                        .remove(handle_to_trailer_rx_key(*h));
322                }
323                TrackedHandle::ReqHead(h) => {
324                    self.store
325                        .request_heads
326                        .lock()
327                        .unwrap_or_else(|e| e.into_inner())
328                        .remove(handle_to_request_head_key(*h));
329                }
330            }
331        }
332    }
333}
334
335/// Per-endpoint handle registry.  Owns all body readers, writers, trailers,
336/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
337/// a single `IrohEndpoint`.
338///
339/// When the endpoint is dropped, this store is dropped with it — all
340/// slot-maps are freed and any remaining handles become invalid.
341pub struct HandleStore {
342    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
343    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
344    trailer_tx: Mutex<SlotMap<TrailerTxKey, Timed<TrailerTx>>>,
345    trailer_rx: Mutex<SlotMap<TrailerRxKey, Timed<TrailerRx>>>,
346    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
347    request_heads:
348        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
349    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
350    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
351    /// Pending trailer receivers matched to allocated sender handles.
352    /// Keyed by the tx handle; claimed by `fetch()` via
353    /// [`claim_pending_trailer_rx`](Self::claim_pending_trailer_rx).
354    pending_trailer_rxs: Mutex<HashMap<u64, PendingTrailerRxEntry>>,
355    pub(crate) config: StoreConfig,
356}
357
358impl HandleStore {
359    /// Create a new handle store with the given configuration.
360    pub fn new(config: StoreConfig) -> Self {
361        Self {
362            readers: Mutex::new(SlotMap::with_key()),
363            writers: Mutex::new(SlotMap::with_key()),
364            trailer_tx: Mutex::new(SlotMap::with_key()),
365            trailer_rx: Mutex::new(SlotMap::with_key()),
366            sessions: Mutex::new(SlotMap::with_key()),
367            request_heads: Mutex::new(SlotMap::with_key()),
368            fetch_cancels: Mutex::new(SlotMap::with_key()),
369            pending_readers: Mutex::new(HashMap::new()),
370            pending_trailer_rxs: Mutex::new(HashMap::new()),
371            config,
372        }
373    }
374
375    // ── Config accessors ─────────────────────────────────────────────────
376
377    /// Create a guard for multi-handle allocation with automatic rollback.
378    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
379        InsertGuard::new(self)
380    }
381
382    /// The configured drain timeout.
383    pub fn drain_timeout(&self) -> Duration {
384        self.config.drain_timeout
385    }
386
387    /// The configured maximum chunk size.
388    pub fn max_chunk_size(&self) -> usize {
389        self.config.max_chunk_size
390    }
391
392    /// Snapshot of handle counts for observability.
393    ///
394    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
395    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
396        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
397        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
398        let sessions = self.sessions.lock().unwrap_or_else(|e| e.into_inner()).len();
399        let total = readers
400            + writers
401            + sessions
402            + self.trailer_tx.lock().unwrap_or_else(|e| e.into_inner()).len()
403            + self.trailer_rx.lock().unwrap_or_else(|e| e.into_inner()).len()
404            + self.request_heads.lock().unwrap_or_else(|e| e.into_inner()).len()
405            + self.fetch_cancels.lock().unwrap_or_else(|e| e.into_inner()).len();
406        (readers, writers, sessions, total)
407    }
408
409    // ── Body channels ────────────────────────────────────────────────────
410
411    /// Create a matched (writer, reader) pair using this store's config.
412    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
413        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
414    }
415
416    // ── Capacity-checked insert ──────────────────────────────────────────
417
418    fn insert_checked<K: slotmap::Key, T>(
419        registry: &Mutex<SlotMap<K, Timed<T>>>,
420        value: T,
421        max: usize,
422    ) -> Result<u64, CoreError> {
423        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
424        if reg.len() >= max {
425            return Err(CoreError::internal("handle registry at capacity"));
426        }
427        let key = reg.insert(Timed::new(value));
428        Ok(key_to_handle(key))
429    }
430
431    // ── Body reader / writer ─────────────────────────────────────────────
432
433    /// Insert a `BodyReader` and return a handle.
434    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
435        Self::insert_checked(&self.readers, reader, self.config.max_handles)
436    }
437
438    /// Insert a `BodyWriter` and return a handle.
439    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
440        Self::insert_checked(&self.writers, writer, self.config.max_handles)
441    }
442
443    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
444    ///
445    /// The writer handle is returned to JS.  The reader must be stashed via
446    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
447    /// can claim it.
448    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
449        let (writer, reader) = self.make_body_channel();
450        let handle = self.insert_writer(writer)?;
451        Ok((handle, reader))
452    }
453
454    /// Store the reader side of a newly allocated writer channel so that the
455    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
456    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
457        self.pending_readers
458            .lock()
459            .unwrap_or_else(|e| e.into_inner())
460            .insert(
461                writer_handle,
462                PendingReaderEntry {
463                    reader,
464                    created: Instant::now(),
465                },
466            );
467    }
468
469    /// Claim the reader that was paired with `writer_handle`.
470    /// Returns `None` if already claimed or never stored.
471    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
472        self.pending_readers
473            .lock()
474            .unwrap_or_else(|e| e.into_inner())
475            .remove(&writer_handle)
476            .map(|e| e.reader)
477    }
478
479    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
480
481    /// Pull the next chunk from a reader handle.
482    ///
483    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
484    /// cleaned up from the registry automatically.
485    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
486        // Clone the Arc — allows awaiting without holding the registry mutex.
487        let (rx_arc, cancel) = {
488            let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
489            let entry = reg
490                .get(handle_to_reader_key(handle))
491                .ok_or_else(|| CoreError::invalid_handle(handle))?;
492            (entry.value.rx.clone(), entry.value.cancel.clone())
493        };
494
495        let chunk = recv_with_cancel(rx_arc, cancel).await;
496
497        // Clean up on EOF so the slot is released promptly.
498        if chunk.is_none() {
499            self.readers
500                .lock()
501                .unwrap_or_else(|e| e.into_inner())
502                .remove(handle_to_reader_key(handle));
503        }
504
505        Ok(chunk)
506    }
507
508    /// Push a chunk into a writer handle.
509    ///
510    /// Chunks larger than the configured `max_chunk_size` are split
511    /// automatically so individual messages stay within the backpressure budget.
512    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
513        // Clone the Sender (cheap) and release the lock before awaiting.
514        let (tx, timeout) = {
515            let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
516            let entry = reg
517                .get(handle_to_writer_key(handle))
518                .ok_or_else(|| CoreError::invalid_handle(handle))?;
519            (entry.value.tx.clone(), entry.value.drain_timeout)
520        };
521        let max = self.config.max_chunk_size;
522        if chunk.len() <= max {
523            tokio::time::timeout(timeout, tx.send(chunk))
524                .await
525                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
526                .map_err(|_| CoreError::internal("body reader dropped"))
527        } else {
528            // Split into max-size pieces.
529            let mut offset = 0;
530            while offset < chunk.len() {
531                let end = (offset + max).min(chunk.len());
532                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
533                    .await
534                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
535                    .map_err(|_| CoreError::internal("body reader dropped"))?;
536                offset = end;
537            }
538            Ok(())
539        }
540    }
541
542    /// Signal end-of-body by dropping the writer from the registry.
543    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
544        self.writers
545            .lock()
546            .unwrap_or_else(|e| e.into_inner())
547            .remove(handle_to_writer_key(handle))
548            .ok_or_else(|| CoreError::invalid_handle(handle))?;
549        Ok(())
550    }
551
552    /// Drop a body reader, signalling cancellation of any in-flight read.
553    pub fn cancel_reader(&self, handle: u64) {
554        let entry = self
555            .readers
556            .lock()
557            .unwrap_or_else(|e| e.into_inner())
558            .remove(handle_to_reader_key(handle));
559        if let Some(e) = entry {
560            e.value.cancel.notify_waiters();
561        }
562    }
563
564    // ── Trailer operations ───────────────────────────────────────────────
565
566    /// Insert a trailer oneshot **sender** and return a handle.
567    pub fn insert_trailer_sender(&self, tx: TrailerTx) -> Result<u64, CoreError> {
568        Self::insert_checked(&self.trailer_tx, tx, self.config.max_handles)
569    }
570
571    /// Insert a trailer oneshot **receiver** and return a handle.
572    pub fn insert_trailer_receiver(&self, rx: TrailerRx) -> Result<u64, CoreError> {
573        Self::insert_checked(&self.trailer_rx, rx, self.config.max_handles)
574    }
575
576    /// Remove (drop) a trailer sender without sending.
577    pub fn remove_trailer_sender(&self, handle: u64) {
578        self.trailer_tx
579            .lock()
580            .unwrap_or_else(|e| e.into_inner())
581            .remove(handle_to_trailer_tx_key(handle));
582    }
583
584    /// Allocate a `(tx, rx)` trailer oneshot pair for sending request trailers
585    /// from JavaScript to the Rust body encoder.
586    ///
587    /// Returns the sender handle — JS holds this and calls `send_trailers` when
588    /// it is done writing the request body.  The matching `rx` is stored in
589    /// `pending_trailer_rxs` and claimed by `fetch()` via
590    /// [`claim_pending_trailer_rx`](Self::claim_pending_trailer_rx).
591    pub fn alloc_trailer_sender(&self) -> Result<u64, CoreError> {
592        let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
593        let handle = self.insert_trailer_sender(tx)?;
594        self.pending_trailer_rxs
595            .lock()
596            .unwrap_or_else(|e| e.into_inner())
597            .insert(handle, PendingTrailerRxEntry { rx, created: Instant::now() });
598        Ok(handle)
599    }
600
601    /// Claim the trailer receiver that was paired with the given sender handle.
602    ///
603    /// Returns `None` if the handle was never allocated via
604    /// [`alloc_trailer_sender`](Self::alloc_trailer_sender) or has already been claimed.
605    pub fn claim_pending_trailer_rx(&self, sender_handle: u64) -> Option<TrailerRx> {
606        self.pending_trailer_rxs
607            .lock()
608            .unwrap_or_else(|e| e.into_inner())
609            .remove(&sender_handle)
610            .map(|e| e.rx)
611    }
612
613    /// Deliver trailers from the JS side to the waiting Rust pump task.
614    pub fn send_trailers(
615        &self,
616        handle: u64,
617        trailers: Vec<(String, String)>,
618    ) -> Result<(), CoreError> {
619        let tx = self
620            .trailer_tx
621            .lock()
622            .unwrap_or_else(|e| e.into_inner())
623            .remove(handle_to_trailer_tx_key(handle))
624            .ok_or_else(|| CoreError::invalid_handle(handle))?
625            .value;
626        tx.send(trailers)
627            .map_err(|_| CoreError::internal("trailer receiver dropped"))
628    }
629
630    /// Await and retrieve trailers produced by the Rust pump task.
631    pub async fn next_trailer(
632        &self,
633        handle: u64,
634    ) -> Result<Option<Vec<(String, String)>>, CoreError> {
635        let rx = self
636            .trailer_rx
637            .lock()
638            .unwrap_or_else(|e| e.into_inner())
639            .remove(handle_to_trailer_rx_key(handle))
640            .ok_or_else(|| CoreError::invalid_handle(handle))?
641            .value;
642        match rx.await {
643            Ok(trailers) => Ok(Some(trailers)),
644            Err(_) => Ok(None), // sender dropped = no trailers
645        }
646    }
647
648    // ── Session ──────────────────────────────────────────────────────────
649
650    /// Insert a `SessionEntry` and return a handle.
651    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
652        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
653    }
654
655    /// Look up a session by handle without consuming it.
656    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
657        self.sessions
658            .lock()
659            .unwrap_or_else(|e| e.into_inner())
660            .get(handle_to_session_key(handle))
661            .map(|e| e.value.clone())
662    }
663
664    /// Remove a session entry by handle and return it.
665    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
666        self.sessions
667            .lock()
668            .unwrap_or_else(|e| e.into_inner())
669            .remove(handle_to_session_key(handle))
670            .map(|e| e.value)
671    }
672
673    // ── Request head (for server respond path) ───────────────────────────
674
675    /// Insert a response-head oneshot sender and return a handle.
676    pub fn allocate_req_handle(
677        &self,
678        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
679    ) -> Result<u64, CoreError> {
680        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
681    }
682
683    /// Remove and return the response-head sender for the given handle.
684    pub fn take_req_sender(
685        &self,
686        handle: u64,
687    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
688        self.request_heads
689            .lock()
690            .unwrap_or_else(|e| e.into_inner())
691            .remove(handle_to_request_head_key(handle))
692            .map(|e| e.value)
693    }
694
695    // ── Fetch cancel ─────────────────────────────────────────────────────
696
697    /// Allocate a cancellation token for an upcoming `fetch` call.
698    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
699        let notify = Arc::new(tokio::sync::Notify::new());
700        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
701    }
702
703    /// Signal an in-flight fetch to abort.
704    pub fn cancel_in_flight(&self, token: u64) {
705        if let Some(entry) = self
706            .fetch_cancels
707            .lock()
708            .unwrap_or_else(|e| e.into_inner())
709            .get(handle_to_fetch_cancel_key(token))
710        {
711            entry.value.notify_one();
712        }
713    }
714
715    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
716    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
717        self.fetch_cancels
718            .lock()
719            .unwrap_or_else(|e| e.into_inner())
720            .get(handle_to_fetch_cancel_key(token))
721            .map(|e| e.value.clone())
722    }
723
724    /// Remove a fetch cancel token after the fetch completes.
725    pub fn remove_fetch_token(&self, token: u64) {
726        self.fetch_cancels
727            .lock()
728            .unwrap_or_else(|e| e.into_inner())
729            .remove(handle_to_fetch_cancel_key(token));
730    }
731
732    // ── TTL sweep ────────────────────────────────────────────────────────
733
734    /// Sweep all registries, removing entries older than `ttl`.
735    /// Also compacts any registry that is empty after sweeping to reclaim
736    /// the backing memory from traffic bursts.
737    pub fn sweep(&self, ttl: Duration) {
738        Self::sweep_registry(&self.readers, ttl);
739        Self::sweep_registry(&self.writers, ttl);
740        Self::sweep_registry(&self.trailer_tx, ttl);
741        Self::sweep_registry(&self.trailer_rx, ttl);
742        Self::sweep_registry(&self.request_heads, ttl);
743        Self::sweep_registry(&self.sessions, ttl);
744        Self::sweep_registry(&self.fetch_cancels, ttl);
745        self.sweep_pending_readers(ttl);
746        self.sweep_pending_trailer_rxs(ttl);
747    }
748
749    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
750        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
751        let expired: Vec<K> = reg
752            .iter()
753            .filter(|(_, e)| e.is_expired(ttl))
754            .map(|(k, _)| k)
755            .collect();
756
757        if expired.is_empty() {
758            return;
759        }
760
761        for key in &expired {
762            reg.remove(*key);
763        }
764        tracing::debug!(
765            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
766            expired.len()
767        );
768        // Compact when empty to reclaim backing memory after traffic bursts.
769        if reg.is_empty() && reg.capacity() > 128 {
770            *reg = SlotMap::with_key();
771        }
772    }
773
774    fn sweep_pending_readers(&self, ttl: Duration) {
775        let mut map = self
776            .pending_readers
777            .lock()
778            .unwrap_or_else(|e| e.into_inner());
779        let before = map.len();
780        map.retain(|_, e| e.created.elapsed() < ttl);
781        let removed = before - map.len();
782        if removed > 0 {
783            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
784        }
785    }
786
787    fn sweep_pending_trailer_rxs(&self, ttl: Duration) {
788        let mut map = self
789            .pending_trailer_rxs
790            .lock()
791            .unwrap_or_else(|e| e.into_inner());
792        let before = map.len();
793        map.retain(|_, e| e.created.elapsed() < ttl);
794        let removed = before - map.len();
795        if removed > 0 {
796            tracing::debug!(
797                "[iroh-http] swept {removed} stale pending trailer receivers (ttl={ttl:?})"
798            );
799        }
800    }
801}
802
803// ── Shared pump helpers ───────────────────────────────────────────────────────
804
805/// Default read buffer size for QUIC stream reads.
806pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
807
808/// Pump raw bytes from a QUIC `RecvStream` into a `BodyWriter`.
809///
810/// Reads `PUMP_READ_BUF`-sized chunks and forwards them through the body
811/// channel.  Stops when the stream ends or the writer is dropped.
812pub(crate) async fn pump_quic_recv_to_body(
813    mut recv: iroh::endpoint::RecvStream,
814    writer: BodyWriter,
815) {
816    while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
817        if writer.send_chunk(chunk.bytes).await.is_err() {
818            break;
819        }
820    }
821    // writer drops → BodyReader sees EOF.
822}
823
824/// Pump raw bytes from a `BodyReader` into a QUIC `SendStream`.
825///
826/// Reads chunks from the body channel and writes them to the stream.
827/// Finishes the stream when the reader reaches EOF.
828pub(crate) async fn pump_body_to_quic_send(
829    reader: BodyReader,
830    mut send: iroh::endpoint::SendStream,
831) {
832    loop {
833        match reader.next_chunk().await {
834            None => break,
835            Some(data) => {
836                if send.write_all(&data).await.is_err() {
837                    break;
838                }
839            }
840        }
841    }
842    let _ = send.finish();
843}
844
845/// Bidirectional pump between a byte-level I/O object and a pair of body channels.
846///
847/// Reads from `io` → sends to `writer` (incoming data).
848/// Reads from `reader` → writes to `io` (outgoing data).
849///
850/// Used for both client-side and server-side duplex upgrade pumps.
851pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
852where
853    IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
854{
855    let (mut recv, mut send) = tokio::io::split(io);
856
857    tokio::join!(
858        async {
859            use bytes::BytesMut;
860            use tokio::io::AsyncReadExt;
861            let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
862            loop {
863                buf.clear();
864                match recv.read_buf(&mut buf).await {
865                    Ok(0) | Err(_) => break,
866                    Ok(_) => {
867                        if writer
868                            .send_chunk(buf.split().freeze())
869                            .await
870                            .is_err()
871                        {
872                            break;
873                        }
874                    }
875                }
876            }
877        },
878        async {
879            use tokio::io::AsyncWriteExt;
880            loop {
881                match reader.next_chunk().await {
882                    None => break,
883                    Some(data) => {
884                        if send.write_all(&data).await.is_err() {
885                            break;
886                        }
887                    }
888                }
889            }
890            let _ = send.shutdown().await;
891        },
892    );
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    fn test_store() -> HandleStore {
900        HandleStore::new(StoreConfig::default())
901    }
902
903    // ── Body channel basics ─────────────────────────────────────────────
904
905    #[tokio::test]
906    async fn body_channel_send_recv() {
907        let (writer, reader) = make_body_channel();
908        writer.send_chunk(Bytes::from("hello")).await.unwrap();
909        drop(writer); // signal EOF
910        let chunk = reader.next_chunk().await;
911        assert_eq!(chunk, Some(Bytes::from("hello")));
912        let eof = reader.next_chunk().await;
913        assert!(eof.is_none());
914    }
915
916    #[tokio::test]
917    async fn body_channel_multiple_chunks() {
918        let (writer, reader) = make_body_channel();
919        writer.send_chunk(Bytes::from("a")).await.unwrap();
920        writer.send_chunk(Bytes::from("b")).await.unwrap();
921        writer.send_chunk(Bytes::from("c")).await.unwrap();
922        drop(writer);
923
924        let mut collected = Vec::new();
925        while let Some(chunk) = reader.next_chunk().await {
926            collected.push(chunk);
927        }
928        assert_eq!(
929            collected,
930            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
931        );
932    }
933
934    #[tokio::test]
935    async fn body_channel_reader_dropped_returns_error() {
936        let (writer, reader) = make_body_channel();
937        drop(reader);
938        let result = writer.send_chunk(Bytes::from("data")).await;
939        assert!(result.is_err());
940    }
941
942    // ── HandleStore operations ──────────────────────────────────────────
943
944    #[tokio::test]
945    async fn insert_reader_and_next_chunk() {
946        let store = test_store();
947        let (writer, reader) = store.make_body_channel();
948        let handle = store.insert_reader(reader).unwrap();
949
950        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
951        drop(writer);
952
953        let chunk = store.next_chunk(handle).await.unwrap();
954        assert_eq!(chunk, Some(Bytes::from("slab-data")));
955
956        // EOF cleans up the registry entry
957        let eof = store.next_chunk(handle).await.unwrap();
958        assert!(eof.is_none());
959    }
960
961    #[tokio::test]
962    async fn next_chunk_invalid_handle() {
963        let store = test_store();
964        let result = store.next_chunk(999999).await;
965        assert!(result.is_err());
966        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
967    }
968
969    #[tokio::test]
970    async fn send_chunk_via_handle() {
971        let store = test_store();
972        let (writer, reader) = store.make_body_channel();
973        let handle = store.insert_writer(writer).unwrap();
974
975        store
976            .send_chunk(handle, Bytes::from("via-slab"))
977            .await
978            .unwrap();
979        store.finish_body(handle).unwrap();
980
981        let chunk = reader.next_chunk().await;
982        assert_eq!(chunk, Some(Bytes::from("via-slab")));
983        let eof = reader.next_chunk().await;
984        assert!(eof.is_none());
985    }
986
987    #[tokio::test]
988    async fn capacity_cap_rejects_overflow() {
989        let store = HandleStore::new(StoreConfig {
990            max_handles: 2,
991            ..StoreConfig::default()
992        });
993        let (_, r1) = store.make_body_channel();
994        let (_, r2) = store.make_body_channel();
995        let (_, r3) = store.make_body_channel();
996        store.insert_reader(r1).unwrap();
997        store.insert_reader(r2).unwrap();
998        let err = store.insert_reader(r3).unwrap_err();
999        assert!(err.message.contains("capacity"));
1000    }
1001
1002    // ── #82 regression: pending_trailer_rxs TTL sweep ──────────────────
1003
1004    #[test]
1005    fn sweep_removes_unclaimed_trailer_receivers() {
1006        let store = test_store();
1007        // Allocate a sender (which stores the rx in pending_trailer_rxs).
1008        let _handle = store.alloc_trailer_sender().unwrap();
1009        // Confirm an entry is present.
1010        assert_eq!(
1011            store.pending_trailer_rxs.lock().unwrap().len(),
1012            1
1013        );
1014        // Sweep with zero TTL — every entry is immediately expired.
1015        store.sweep(Duration::ZERO);
1016        assert_eq!(
1017            store.pending_trailer_rxs.lock().unwrap().len(),
1018            0,
1019            "sweep() must remove unclaimed pending trailer receivers"
1020        );
1021    }
1022
1023    // ── #84 regression: recv_with_cancel cancellation ──────────────────
1024
1025    #[tokio::test]
1026    async fn recv_with_cancel_returns_none_on_cancel() {
1027        let (_tx, rx) = mpsc::channel::<Bytes>(4);
1028        let rx = Arc::new(tokio::sync::Mutex::new(rx));
1029        let cancel = Arc::new(tokio::sync::Notify::new());
1030        // Notify before polling — biased select must return None immediately.
1031        cancel.notify_one();
1032        let result = recv_with_cancel(rx, cancel).await;
1033        assert!(result.is_none());
1034    }
1035}