Skip to main content

iroh_http_core/
stream.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7
8use std::{
9    collections::HashMap,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
24pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
25pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
26pub const DEFAULT_MAX_HANDLES: usize = 65_536;
27
28// ── Resource types ────────────────────────────────────────────────────────────
29
30pub struct SessionEntry {
31    pub conn: iroh::endpoint::Connection,
32}
33
34pub struct ResponseHeadEntry {
35    pub status: u16,
36    pub headers: Vec<(String, String)>,
37}
38
39// ── SlotMap key types ─────────────────────────────────────────────────────────
40
41slotmap::new_key_type! { pub(crate) struct ReaderKey; }
42slotmap::new_key_type! { pub(crate) struct WriterKey; }
43slotmap::new_key_type! { pub(crate) struct TrailerTxKey; }
44slotmap::new_key_type! { pub(crate) struct TrailerRxKey; }
45slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
46slotmap::new_key_type! { pub(crate) struct SessionKey; }
47slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
48
49// ── Handle encode / decode helpers ───────────────────────────────────────────
50
51fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
52    k.data().as_ffi()
53}
54
55macro_rules! handle_to_key {
56    ($fn_name:ident, $key_type:ty) => {
57        fn $fn_name(h: u64) -> $key_type {
58            <$key_type>::from(KeyData::from_ffi(h))
59        }
60    };
61}
62
63handle_to_key!(handle_to_reader_key, ReaderKey);
64handle_to_key!(handle_to_writer_key, WriterKey);
65handle_to_key!(handle_to_trailer_tx_key, TrailerTxKey);
66handle_to_key!(handle_to_trailer_rx_key, TrailerRxKey);
67handle_to_key!(handle_to_session_key, SessionKey);
68handle_to_key!(handle_to_request_head_key, RequestHeadKey);
69handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
70
71// ── Body channel primitives ───────────────────────────────────────────────────
72
73/// Consumer end — stored in the reader registry.
74/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
75/// the registry's `std::sync::Mutex`.
76pub struct BodyReader {
77    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
78    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
79    /// so in-flight `next_chunk` awaits terminate promptly.
80    pub(crate) cancel: Arc<tokio::sync::Notify>,
81    /// Drain timeout inherited from the endpoint config at channel-creation time.
82    pub(crate) drain_timeout: Duration,
83}
84
85/// Producer end — stored in the writer registry.
86/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
87pub struct BodyWriter {
88    pub(crate) tx: mpsc::Sender<Bytes>,
89    /// Drain timeout baked in at channel-creation time from the endpoint config.
90    pub(crate) drain_timeout: Duration,
91}
92
93/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
94///
95/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
96/// the channel inherits the endpoint's backpressure config.  This free
97/// function uses the compile-time defaults and exists for tests and pre-bind
98/// code paths.
99pub fn make_body_channel() -> (BodyWriter, BodyReader) {
100    make_body_channel_with(
101        DEFAULT_CHANNEL_CAPACITY,
102        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
103    )
104}
105
106fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
107    let (tx, rx) = mpsc::channel(capacity);
108    (
109        BodyWriter { tx, drain_timeout },
110        BodyReader {
111            rx: Arc::new(tokio::sync::Mutex::new(rx)),
112            cancel: Arc::new(tokio::sync::Notify::new()),
113            drain_timeout,
114        },
115    )
116}
117
118// ── Cancellable receive ───────────────────────────────────────────────────────
119
120/// Receive the next chunk from a body channel, aborting immediately if
121/// `cancel` is notified.
122///
123/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
124/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
125/// this helper so the cancellation semantics are defined and tested in one place.
126async fn recv_with_cancel(
127    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
128    cancel: Arc<tokio::sync::Notify>,
129) -> Option<Bytes> {
130    tokio::select! {
131        biased;
132        _ = cancel.notified() => None,
133        chunk = async { rx.lock().await.recv().await } => chunk,
134    }
135}
136
137impl BodyReader {
138    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
139    /// or when the reader has been cancelled.
140    pub async fn next_chunk(&self) -> Option<Bytes> {
141        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
142    }
143}
144
145impl BodyWriter {
146    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
147    /// the drain timeout expires (JS not reading fast enough).
148    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
149        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
150            .await
151            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
152            .map_err(|_| "body reader dropped".to_string())
153    }
154}
155
156// ── Trailer type aliases ──────────────────────────────────────────────────────
157
158type TrailerTx = tokio::sync::oneshot::Sender<Vec<(String, String)>>;
159pub(crate) type TrailerRx = tokio::sync::oneshot::Receiver<Vec<(String, String)>>;
160
161// ── StoreConfig ───────────────────────────────────────────────────────────────
162
163/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
164#[derive(Debug, Clone)]
165pub struct StoreConfig {
166    /// Body-channel capacity (in chunks).  Minimum 1.
167    pub channel_capacity: usize,
168    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
169    pub max_chunk_size: usize,
170    /// Milliseconds to wait for a slow body reader before dropping.
171    pub drain_timeout: Duration,
172    /// Maximum handle slots per registry.  Prevents unbounded growth.
173    pub max_handles: usize,
174    /// TTL for handle entries; expired entries are swept periodically.
175    /// Zero disables sweeping.
176    pub ttl: Duration,
177}
178
179impl Default for StoreConfig {
180    fn default() -> Self {
181        Self {
182            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
183            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
184            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
185            max_handles: DEFAULT_MAX_HANDLES,
186            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
187        }
188    }
189}
190
191// ── Timed wrapper ─────────────────────────────────────────────────────────────
192
193struct Timed<T> {
194    value: T,
195    created_at: Instant,
196}
197
198impl<T> Timed<T> {
199    fn new(value: T) -> Self {
200        Self {
201            value,
202            created_at: Instant::now(),
203        }
204    }
205
206    fn is_expired(&self, ttl: Duration) -> bool {
207        self.created_at.elapsed() > ttl
208    }
209}
210
211/// Pending reader tracked with insertion time for TTL sweep.
212struct PendingReaderEntry {
213    reader: BodyReader,
214    created: Instant,
215}
216
217/// Pending trailer receiver tracked with insertion time for TTL sweep.
218struct PendingTrailerRxEntry {
219    rx: TrailerRx,
220    created: Instant,
221}
222
223// ── HandleStore ───────────────────────────────────────────────────────────────
224
225/// Tracks handles inserted during a multi-handle allocation sequence.
226/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
227/// has been called. This prevents orphaned handles when a later insert fails.
228pub(crate) struct InsertGuard<'a> {
229    store: &'a HandleStore,
230    tracked: Vec<TrackedHandle>,
231    committed: bool,
232}
233
234/// A handle tracked by [`InsertGuard`] for rollback on drop.
235///
236/// # Intentionally omitted variants
237///
238/// `Session` and `FetchCancel` are not tracked here because their lifecycles
239/// are managed outside of multi-handle allocation sequences:
240/// - Sessions are created and closed by `session_connect` / `session_close`
241///   independently and are never allocated inside a guard.
242/// - Fetch cancel tokens are allocated before a guard is opened and are
243///   always cleaned up by `remove_fetch_token` after the fetch resolves.
244///
245/// If either type is ever added to a guard-guarded allocation path in the
246/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
247/// corresponding rollback arms in [`InsertGuard::drop`].
248enum TrackedHandle {
249    Reader(u64),
250    Writer(u64),
251    TrailerTx(u64),
252    TrailerRx(u64),
253    ReqHead(u64),
254}
255
256impl<'a> InsertGuard<'a> {
257    fn new(store: &'a HandleStore) -> Self {
258        Self {
259            store,
260            tracked: Vec::new(),
261            committed: false,
262        }
263    }
264
265    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
266        let h = self.store.insert_reader(reader)?;
267        self.tracked.push(TrackedHandle::Reader(h));
268        Ok(h)
269    }
270
271    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
272        let h = self.store.insert_writer(writer)?;
273        self.tracked.push(TrackedHandle::Writer(h));
274        Ok(h)
275    }
276
277    pub fn insert_trailer_sender(&mut self, tx: TrailerTx) -> Result<u64, CoreError> {
278        let h = self.store.insert_trailer_sender(tx)?;
279        self.tracked.push(TrackedHandle::TrailerTx(h));
280        Ok(h)
281    }
282
283    pub fn insert_trailer_receiver(&mut self, rx: TrailerRx) -> Result<u64, CoreError> {
284        let h = self.store.insert_trailer_receiver(rx)?;
285        self.tracked.push(TrackedHandle::TrailerRx(h));
286        Ok(h)
287    }
288
289    pub fn allocate_req_handle(
290        &mut self,
291        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
292    ) -> Result<u64, CoreError> {
293        let h = self.store.allocate_req_handle(sender)?;
294        self.tracked.push(TrackedHandle::ReqHead(h));
295        Ok(h)
296    }
297
298    /// Consume the guard without rolling back. Call after all inserts succeed.
299    pub fn commit(mut self) {
300        self.committed = true;
301    }
302}
303
304impl Drop for InsertGuard<'_> {
305    fn drop(&mut self) {
306        if self.committed {
307            return;
308        }
309        for handle in &self.tracked {
310            match handle {
311                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
312                TrackedHandle::Writer(h) => {
313                    let _ = self.store.finish_body(*h);
314                }
315                TrackedHandle::TrailerTx(h) => self.store.remove_trailer_sender(*h),
316                TrackedHandle::TrailerRx(h) => {
317                    self.store
318                        .trailer_rx
319                        .lock()
320                        .unwrap_or_else(|e| e.into_inner())
321                        .remove(handle_to_trailer_rx_key(*h));
322                }
323                TrackedHandle::ReqHead(h) => {
324                    self.store
325                        .request_heads
326                        .lock()
327                        .unwrap_or_else(|e| e.into_inner())
328                        .remove(handle_to_request_head_key(*h));
329                }
330            }
331        }
332    }
333}
334
335/// Per-endpoint handle registry.  Owns all body readers, writers, trailers,
336/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
337/// a single `IrohEndpoint`.
338///
339/// When the endpoint is dropped, this store is dropped with it — all
340/// slot-maps are freed and any remaining handles become invalid.
341pub struct HandleStore {
342    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
343    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
344    trailer_tx: Mutex<SlotMap<TrailerTxKey, Timed<TrailerTx>>>,
345    trailer_rx: Mutex<SlotMap<TrailerRxKey, Timed<TrailerRx>>>,
346    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
347    request_heads:
348        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
349    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
350    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
351    /// Pending trailer receivers matched to allocated sender handles.
352    /// Keyed by the tx handle; claimed by `fetch()` via
353    /// [`claim_pending_trailer_rx`](Self::claim_pending_trailer_rx).
354    pending_trailer_rxs: Mutex<HashMap<u64, PendingTrailerRxEntry>>,
355    pub(crate) config: StoreConfig,
356}
357
358impl HandleStore {
359    /// Create a new handle store with the given configuration.
360    pub fn new(config: StoreConfig) -> Self {
361        Self {
362            readers: Mutex::new(SlotMap::with_key()),
363            writers: Mutex::new(SlotMap::with_key()),
364            trailer_tx: Mutex::new(SlotMap::with_key()),
365            trailer_rx: Mutex::new(SlotMap::with_key()),
366            sessions: Mutex::new(SlotMap::with_key()),
367            request_heads: Mutex::new(SlotMap::with_key()),
368            fetch_cancels: Mutex::new(SlotMap::with_key()),
369            pending_readers: Mutex::new(HashMap::new()),
370            pending_trailer_rxs: Mutex::new(HashMap::new()),
371            config,
372        }
373    }
374
375    // ── Config accessors ─────────────────────────────────────────────────
376
377    /// Create a guard for multi-handle allocation with automatic rollback.
378    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
379        InsertGuard::new(self)
380    }
381
382    /// The configured drain timeout.
383    pub fn drain_timeout(&self) -> Duration {
384        self.config.drain_timeout
385    }
386
387    /// The configured maximum chunk size.
388    pub fn max_chunk_size(&self) -> usize {
389        self.config.max_chunk_size
390    }
391
392    /// Snapshot of handle counts for observability.
393    ///
394    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
395    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
396        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
397        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
398        let sessions = self
399            .sessions
400            .lock()
401            .unwrap_or_else(|e| e.into_inner())
402            .len();
403        let total = readers
404            .saturating_add(writers)
405            .saturating_add(sessions)
406            .saturating_add(
407                self.trailer_tx
408                    .lock()
409                    .unwrap_or_else(|e| e.into_inner())
410                    .len(),
411            )
412            .saturating_add(
413                self.trailer_rx
414                    .lock()
415                    .unwrap_or_else(|e| e.into_inner())
416                    .len(),
417            )
418            .saturating_add(
419                self.request_heads
420                    .lock()
421                    .unwrap_or_else(|e| e.into_inner())
422                    .len(),
423            )
424            .saturating_add(
425                self.fetch_cancels
426                    .lock()
427                    .unwrap_or_else(|e| e.into_inner())
428                    .len(),
429            );
430        (readers, writers, sessions, total)
431    }
432
433    // ── Body channels ────────────────────────────────────────────────────
434
435    /// Create a matched (writer, reader) pair using this store's config.
436    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
437        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
438    }
439
440    // ── Capacity-checked insert ──────────────────────────────────────────
441
442    fn insert_checked<K: slotmap::Key, T>(
443        registry: &Mutex<SlotMap<K, Timed<T>>>,
444        value: T,
445        max: usize,
446    ) -> Result<u64, CoreError> {
447        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
448        if reg.len() >= max {
449            return Err(CoreError::internal("handle registry at capacity"));
450        }
451        let key = reg.insert(Timed::new(value));
452        Ok(key_to_handle(key))
453    }
454
455    // ── Body reader / writer ─────────────────────────────────────────────
456
457    /// Insert a `BodyReader` and return a handle.
458    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
459        Self::insert_checked(&self.readers, reader, self.config.max_handles)
460    }
461
462    /// Insert a `BodyWriter` and return a handle.
463    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
464        Self::insert_checked(&self.writers, writer, self.config.max_handles)
465    }
466
467    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
468    ///
469    /// The writer handle is returned to JS.  The reader must be stashed via
470    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
471    /// can claim it.
472    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
473        let (writer, reader) = self.make_body_channel();
474        let handle = self.insert_writer(writer)?;
475        Ok((handle, reader))
476    }
477
478    /// Store the reader side of a newly allocated writer channel so that the
479    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
480    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
481        self.pending_readers
482            .lock()
483            .unwrap_or_else(|e| e.into_inner())
484            .insert(
485                writer_handle,
486                PendingReaderEntry {
487                    reader,
488                    created: Instant::now(),
489                },
490            );
491    }
492
493    /// Claim the reader that was paired with `writer_handle`.
494    /// Returns `None` if already claimed or never stored.
495    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
496        self.pending_readers
497            .lock()
498            .unwrap_or_else(|e| e.into_inner())
499            .remove(&writer_handle)
500            .map(|e| e.reader)
501    }
502
503    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
504
505    /// Pull the next chunk from a reader handle.
506    ///
507    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
508    /// cleaned up from the registry automatically.
509    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
510        // Clone the Arc — allows awaiting without holding the registry mutex.
511        let (rx_arc, cancel) = {
512            let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
513            let entry = reg
514                .get(handle_to_reader_key(handle))
515                .ok_or_else(|| CoreError::invalid_handle(handle))?;
516            (entry.value.rx.clone(), entry.value.cancel.clone())
517        };
518
519        let chunk = recv_with_cancel(rx_arc, cancel).await;
520
521        // Clean up on EOF so the slot is released promptly.
522        if chunk.is_none() {
523            self.readers
524                .lock()
525                .unwrap_or_else(|e| e.into_inner())
526                .remove(handle_to_reader_key(handle));
527        }
528
529        Ok(chunk)
530    }
531
532    /// Push a chunk into a writer handle.
533    ///
534    /// Chunks larger than the configured `max_chunk_size` are split
535    /// automatically so individual messages stay within the backpressure budget.
536    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
537        // Clone the Sender (cheap) and release the lock before awaiting.
538        let (tx, timeout) = {
539            let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
540            let entry = reg
541                .get(handle_to_writer_key(handle))
542                .ok_or_else(|| CoreError::invalid_handle(handle))?;
543            (entry.value.tx.clone(), entry.value.drain_timeout)
544        };
545        let max = self.config.max_chunk_size;
546        if chunk.len() <= max {
547            tokio::time::timeout(timeout, tx.send(chunk))
548                .await
549                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
550                .map_err(|_| CoreError::internal("body reader dropped"))
551        } else {
552            // Split into max-size pieces.
553            let mut offset = 0;
554            while offset < chunk.len() {
555                let end = offset.saturating_add(max).min(chunk.len());
556                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
557                    .await
558                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
559                    .map_err(|_| CoreError::internal("body reader dropped"))?;
560                offset = end;
561            }
562            Ok(())
563        }
564    }
565
566    /// Signal end-of-body by dropping the writer from the registry.
567    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
568        self.writers
569            .lock()
570            .unwrap_or_else(|e| e.into_inner())
571            .remove(handle_to_writer_key(handle))
572            .ok_or_else(|| CoreError::invalid_handle(handle))?;
573        Ok(())
574    }
575
576    /// Drop a body reader, signalling cancellation of any in-flight read.
577    pub fn cancel_reader(&self, handle: u64) {
578        let entry = self
579            .readers
580            .lock()
581            .unwrap_or_else(|e| e.into_inner())
582            .remove(handle_to_reader_key(handle));
583        if let Some(e) = entry {
584            e.value.cancel.notify_waiters();
585        }
586    }
587
588    // ── Trailer operations ───────────────────────────────────────────────
589
590    /// Insert a trailer oneshot **sender** and return a handle.
591    pub fn insert_trailer_sender(&self, tx: TrailerTx) -> Result<u64, CoreError> {
592        Self::insert_checked(&self.trailer_tx, tx, self.config.max_handles)
593    }
594
595    /// Insert a trailer oneshot **receiver** and return a handle.
596    pub fn insert_trailer_receiver(&self, rx: TrailerRx) -> Result<u64, CoreError> {
597        Self::insert_checked(&self.trailer_rx, rx, self.config.max_handles)
598    }
599
600    /// Remove (drop) a trailer sender without sending.
601    pub fn remove_trailer_sender(&self, handle: u64) {
602        self.trailer_tx
603            .lock()
604            .unwrap_or_else(|e| e.into_inner())
605            .remove(handle_to_trailer_tx_key(handle));
606    }
607
608    /// Allocate a `(tx, rx)` trailer oneshot pair for sending request trailers
609    /// from JavaScript to the Rust body encoder.
610    ///
611    /// Returns the sender handle — JS holds this and calls `send_trailers` when
612    /// it is done writing the request body.  The matching `rx` is stored in
613    /// `pending_trailer_rxs` and claimed by `fetch()` via
614    /// [`claim_pending_trailer_rx`](Self::claim_pending_trailer_rx).
615    pub fn alloc_trailer_sender(&self) -> Result<u64, CoreError> {
616        let (tx, rx) = tokio::sync::oneshot::channel::<Vec<(String, String)>>();
617        let handle = self.insert_trailer_sender(tx)?;
618        self.pending_trailer_rxs
619            .lock()
620            .unwrap_or_else(|e| e.into_inner())
621            .insert(
622                handle,
623                PendingTrailerRxEntry {
624                    rx,
625                    created: Instant::now(),
626                },
627            );
628        Ok(handle)
629    }
630
631    /// Claim the trailer receiver that was paired with the given sender handle.
632    ///
633    /// Returns `None` if the handle was never allocated via
634    /// [`alloc_trailer_sender`](Self::alloc_trailer_sender) or has already been claimed.
635    pub fn claim_pending_trailer_rx(&self, sender_handle: u64) -> Option<TrailerRx> {
636        self.pending_trailer_rxs
637            .lock()
638            .unwrap_or_else(|e| e.into_inner())
639            .remove(&sender_handle)
640            .map(|e| e.rx)
641    }
642
643    /// Deliver trailers from the JS side to the waiting Rust pump task.
644    pub fn send_trailers(
645        &self,
646        handle: u64,
647        trailers: Vec<(String, String)>,
648    ) -> Result<(), CoreError> {
649        let tx = self
650            .trailer_tx
651            .lock()
652            .unwrap_or_else(|e| e.into_inner())
653            .remove(handle_to_trailer_tx_key(handle))
654            .ok_or_else(|| CoreError::invalid_handle(handle))?
655            .value;
656        tx.send(trailers)
657            .map_err(|_| CoreError::internal("trailer receiver dropped"))
658    }
659
660    /// Await and retrieve trailers produced by the Rust pump task.
661    pub async fn next_trailer(
662        &self,
663        handle: u64,
664    ) -> Result<Option<Vec<(String, String)>>, CoreError> {
665        let rx = self
666            .trailer_rx
667            .lock()
668            .unwrap_or_else(|e| e.into_inner())
669            .remove(handle_to_trailer_rx_key(handle))
670            .ok_or_else(|| CoreError::invalid_handle(handle))?
671            .value;
672        match rx.await {
673            Ok(trailers) => Ok(Some(trailers)),
674            Err(_) => Ok(None), // sender dropped = no trailers
675        }
676    }
677
678    // ── Session ──────────────────────────────────────────────────────────
679
680    /// Insert a `SessionEntry` and return a handle.
681    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
682        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
683    }
684
685    /// Look up a session by handle without consuming it.
686    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
687        self.sessions
688            .lock()
689            .unwrap_or_else(|e| e.into_inner())
690            .get(handle_to_session_key(handle))
691            .map(|e| e.value.clone())
692    }
693
694    /// Remove a session entry by handle and return it.
695    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
696        self.sessions
697            .lock()
698            .unwrap_or_else(|e| e.into_inner())
699            .remove(handle_to_session_key(handle))
700            .map(|e| e.value)
701    }
702
703    // ── Request head (for server respond path) ───────────────────────────
704
705    /// Insert a response-head oneshot sender and return a handle.
706    pub fn allocate_req_handle(
707        &self,
708        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
709    ) -> Result<u64, CoreError> {
710        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
711    }
712
713    /// Remove and return the response-head sender for the given handle.
714    pub fn take_req_sender(
715        &self,
716        handle: u64,
717    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
718        self.request_heads
719            .lock()
720            .unwrap_or_else(|e| e.into_inner())
721            .remove(handle_to_request_head_key(handle))
722            .map(|e| e.value)
723    }
724
725    // ── Fetch cancel ─────────────────────────────────────────────────────
726
727    /// Allocate a cancellation token for an upcoming `fetch` call.
728    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
729        let notify = Arc::new(tokio::sync::Notify::new());
730        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
731    }
732
733    /// Signal an in-flight fetch to abort.
734    pub fn cancel_in_flight(&self, token: u64) {
735        if let Some(entry) = self
736            .fetch_cancels
737            .lock()
738            .unwrap_or_else(|e| e.into_inner())
739            .get(handle_to_fetch_cancel_key(token))
740        {
741            entry.value.notify_one();
742        }
743    }
744
745    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
746    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
747        self.fetch_cancels
748            .lock()
749            .unwrap_or_else(|e| e.into_inner())
750            .get(handle_to_fetch_cancel_key(token))
751            .map(|e| e.value.clone())
752    }
753
754    /// Remove a fetch cancel token after the fetch completes.
755    pub fn remove_fetch_token(&self, token: u64) {
756        self.fetch_cancels
757            .lock()
758            .unwrap_or_else(|e| e.into_inner())
759            .remove(handle_to_fetch_cancel_key(token));
760    }
761
762    // ── TTL sweep ────────────────────────────────────────────────────────
763
764    /// Sweep all registries, removing entries older than `ttl`.
765    /// Also compacts any registry that is empty after sweeping to reclaim
766    /// the backing memory from traffic bursts.
767    pub fn sweep(&self, ttl: Duration) {
768        Self::sweep_registry(&self.readers, ttl);
769        Self::sweep_registry(&self.writers, ttl);
770        Self::sweep_registry(&self.trailer_tx, ttl);
771        Self::sweep_registry(&self.trailer_rx, ttl);
772        Self::sweep_registry(&self.request_heads, ttl);
773        Self::sweep_registry(&self.sessions, ttl);
774        Self::sweep_registry(&self.fetch_cancels, ttl);
775        self.sweep_pending_readers(ttl);
776        self.sweep_pending_trailer_rxs(ttl);
777    }
778
779    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
780        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
781        let expired: Vec<K> = reg
782            .iter()
783            .filter(|(_, e)| e.is_expired(ttl))
784            .map(|(k, _)| k)
785            .collect();
786
787        if expired.is_empty() {
788            return;
789        }
790
791        for key in &expired {
792            reg.remove(*key);
793        }
794        tracing::debug!(
795            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
796            expired.len()
797        );
798        // Compact when empty to reclaim backing memory after traffic bursts.
799        if reg.is_empty() && reg.capacity() > 128 {
800            *reg = SlotMap::with_key();
801        }
802    }
803
804    fn sweep_pending_readers(&self, ttl: Duration) {
805        let mut map = self
806            .pending_readers
807            .lock()
808            .unwrap_or_else(|e| e.into_inner());
809        let before = map.len();
810        map.retain(|_, e| e.created.elapsed() < ttl);
811        let removed = before.saturating_sub(map.len());
812        if removed > 0 {
813            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
814        }
815    }
816
817    fn sweep_pending_trailer_rxs(&self, ttl: Duration) {
818        let mut map = self
819            .pending_trailer_rxs
820            .lock()
821            .unwrap_or_else(|e| e.into_inner());
822        let before = map.len();
823        map.retain(|_, e| e.created.elapsed() < ttl);
824        let removed = before.saturating_sub(map.len());
825        if removed > 0 {
826            tracing::debug!(
827                "[iroh-http] swept {removed} stale pending trailer receivers (ttl={ttl:?})"
828            );
829        }
830    }
831}
832
833// ── Shared pump helpers ───────────────────────────────────────────────────────
834
835/// Default read buffer size for QUIC stream reads.
836pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
837
838/// Pump raw bytes from a QUIC `RecvStream` into a `BodyWriter`.
839///
840/// Reads `PUMP_READ_BUF`-sized chunks and forwards them through the body
841/// channel.  Stops when the stream ends or the writer is dropped.
842pub(crate) async fn pump_quic_recv_to_body(
843    mut recv: iroh::endpoint::RecvStream,
844    writer: BodyWriter,
845) {
846    while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
847        if writer.send_chunk(chunk.bytes).await.is_err() {
848            break;
849        }
850    }
851    // writer drops → BodyReader sees EOF.
852}
853
854/// Pump raw bytes from a `BodyReader` into a QUIC `SendStream`.
855///
856/// Reads chunks from the body channel and writes them to the stream.
857/// Finishes the stream when the reader reaches EOF.
858pub(crate) async fn pump_body_to_quic_send(
859    reader: BodyReader,
860    mut send: iroh::endpoint::SendStream,
861) {
862    loop {
863        match reader.next_chunk().await {
864            None => break,
865            Some(data) => {
866                if send.write_all(&data).await.is_err() {
867                    break;
868                }
869            }
870        }
871    }
872    let _ = send.finish();
873}
874
875/// Bidirectional pump between a byte-level I/O object and a pair of body channels.
876///
877/// Reads from `io` → sends to `writer` (incoming data).
878/// Reads from `reader` → writes to `io` (outgoing data).
879///
880/// Used for both client-side and server-side duplex upgrade pumps.
881pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
882where
883    IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
884{
885    let (mut recv, mut send) = tokio::io::split(io);
886
887    tokio::join!(
888        async {
889            use bytes::BytesMut;
890            use tokio::io::AsyncReadExt;
891            let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
892            loop {
893                buf.clear();
894                match recv.read_buf(&mut buf).await {
895                    Ok(0) | Err(_) => break,
896                    Ok(_) => {
897                        if writer.send_chunk(buf.split().freeze()).await.is_err() {
898                            break;
899                        }
900                    }
901                }
902            }
903        },
904        async {
905            use tokio::io::AsyncWriteExt;
906            loop {
907                match reader.next_chunk().await {
908                    None => break,
909                    Some(data) => {
910                        if send.write_all(&data).await.is_err() {
911                            break;
912                        }
913                    }
914                }
915            }
916            let _ = send.shutdown().await;
917        },
918    );
919}
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924
925    fn test_store() -> HandleStore {
926        HandleStore::new(StoreConfig::default())
927    }
928
929    // ── Body channel basics ─────────────────────────────────────────────
930
931    #[tokio::test]
932    async fn body_channel_send_recv() {
933        let (writer, reader) = make_body_channel();
934        writer.send_chunk(Bytes::from("hello")).await.unwrap();
935        drop(writer); // signal EOF
936        let chunk = reader.next_chunk().await;
937        assert_eq!(chunk, Some(Bytes::from("hello")));
938        let eof = reader.next_chunk().await;
939        assert!(eof.is_none());
940    }
941
942    #[tokio::test]
943    async fn body_channel_multiple_chunks() {
944        let (writer, reader) = make_body_channel();
945        writer.send_chunk(Bytes::from("a")).await.unwrap();
946        writer.send_chunk(Bytes::from("b")).await.unwrap();
947        writer.send_chunk(Bytes::from("c")).await.unwrap();
948        drop(writer);
949
950        let mut collected = Vec::new();
951        while let Some(chunk) = reader.next_chunk().await {
952            collected.push(chunk);
953        }
954        assert_eq!(
955            collected,
956            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
957        );
958    }
959
960    #[tokio::test]
961    async fn body_channel_reader_dropped_returns_error() {
962        let (writer, reader) = make_body_channel();
963        drop(reader);
964        let result = writer.send_chunk(Bytes::from("data")).await;
965        assert!(result.is_err());
966    }
967
968    // ── HandleStore operations ──────────────────────────────────────────
969
970    #[tokio::test]
971    async fn insert_reader_and_next_chunk() {
972        let store = test_store();
973        let (writer, reader) = store.make_body_channel();
974        let handle = store.insert_reader(reader).unwrap();
975
976        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
977        drop(writer);
978
979        let chunk = store.next_chunk(handle).await.unwrap();
980        assert_eq!(chunk, Some(Bytes::from("slab-data")));
981
982        // EOF cleans up the registry entry
983        let eof = store.next_chunk(handle).await.unwrap();
984        assert!(eof.is_none());
985    }
986
987    #[tokio::test]
988    async fn next_chunk_invalid_handle() {
989        let store = test_store();
990        let result = store.next_chunk(999999).await;
991        assert!(result.is_err());
992        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
993    }
994
995    #[tokio::test]
996    async fn send_chunk_via_handle() {
997        let store = test_store();
998        let (writer, reader) = store.make_body_channel();
999        let handle = store.insert_writer(writer).unwrap();
1000
1001        store
1002            .send_chunk(handle, Bytes::from("via-slab"))
1003            .await
1004            .unwrap();
1005        store.finish_body(handle).unwrap();
1006
1007        let chunk = reader.next_chunk().await;
1008        assert_eq!(chunk, Some(Bytes::from("via-slab")));
1009        let eof = reader.next_chunk().await;
1010        assert!(eof.is_none());
1011    }
1012
1013    #[tokio::test]
1014    async fn capacity_cap_rejects_overflow() {
1015        let store = HandleStore::new(StoreConfig {
1016            max_handles: 2,
1017            ..StoreConfig::default()
1018        });
1019        let (_, r1) = store.make_body_channel();
1020        let (_, r2) = store.make_body_channel();
1021        let (_, r3) = store.make_body_channel();
1022        store.insert_reader(r1).unwrap();
1023        store.insert_reader(r2).unwrap();
1024        let err = store.insert_reader(r3).unwrap_err();
1025        assert!(err.message.contains("capacity"));
1026    }
1027
1028    // ── #82 regression: pending_trailer_rxs TTL sweep ──────────────────
1029
1030    #[test]
1031    fn sweep_removes_unclaimed_trailer_receivers() {
1032        let store = test_store();
1033        // Allocate a sender (which stores the rx in pending_trailer_rxs).
1034        let _handle = store.alloc_trailer_sender().unwrap();
1035        // Confirm an entry is present.
1036        assert_eq!(store.pending_trailer_rxs.lock().unwrap().len(), 1);
1037        // Sweep with zero TTL — every entry is immediately expired.
1038        store.sweep(Duration::ZERO);
1039        assert_eq!(
1040            store.pending_trailer_rxs.lock().unwrap().len(),
1041            0,
1042            "sweep() must remove unclaimed pending trailer receivers"
1043        );
1044    }
1045
1046    // ── #84 regression: recv_with_cancel cancellation ──────────────────
1047
1048    #[tokio::test]
1049    async fn recv_with_cancel_returns_none_on_cancel() {
1050        let (_tx, rx) = mpsc::channel::<Bytes>(4);
1051        let rx = Arc::new(tokio::sync::Mutex::new(rx));
1052        let cancel = Arc::new(tokio::sync::Notify::new());
1053        // Notify before polling — biased select must return None immediately.
1054        cancel.notify_one();
1055        let result = recv_with_cancel(rx, cancel).await;
1056        assert!(result.is_none());
1057    }
1058}