Skip to main content

iroh_http_core/
stream.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7
8use std::{
9    collections::HashMap,
10    sync::{Arc, Mutex},
11    time::{Duration, Instant},
12};
13
14use bytes::Bytes;
15use slotmap::{KeyData, SlotMap};
16use tokio::sync::mpsc;
17
18use crate::CoreError;
19
20// ── Constants ─────────────────────────────────────────────────────────────────
21
22pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
23pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
24pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
25pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
26pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; // 60 s
27pub const DEFAULT_MAX_HANDLES: usize = 65_536;
28
29// ── Resource types ────────────────────────────────────────────────────────────
30
31pub struct SessionEntry {
32    pub conn: iroh::endpoint::Connection,
33}
34
35pub struct ResponseHeadEntry {
36    pub status: u16,
37    pub headers: Vec<(String, String)>,
38}
39
40// ── SlotMap key types ─────────────────────────────────────────────────────────
41
42slotmap::new_key_type! { pub(crate) struct ReaderKey; }
43slotmap::new_key_type! { pub(crate) struct WriterKey; }
44slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
45slotmap::new_key_type! { pub(crate) struct SessionKey; }
46slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
47
48// ── Handle encode / decode helpers ───────────────────────────────────────────
49
50fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
51    k.data().as_ffi()
52}
53
54macro_rules! handle_to_key {
55    ($fn_name:ident, $key_type:ty) => {
56        fn $fn_name(h: u64) -> $key_type {
57            <$key_type>::from(KeyData::from_ffi(h))
58        }
59    };
60}
61
62handle_to_key!(handle_to_reader_key, ReaderKey);
63handle_to_key!(handle_to_writer_key, WriterKey);
64handle_to_key!(handle_to_session_key, SessionKey);
65handle_to_key!(handle_to_request_head_key, RequestHeadKey);
66handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
67
68// ── Body channel primitives ───────────────────────────────────────────────────
69
70/// Consumer end — stored in the reader registry.
71/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
72/// the registry's `std::sync::Mutex`.
73pub struct BodyReader {
74    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
75    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
76    /// so in-flight `next_chunk` awaits terminate promptly.
77    pub(crate) cancel: Arc<tokio::sync::Notify>,
78}
79
80/// Producer end — stored in the writer registry.
81/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
82pub struct BodyWriter {
83    pub(crate) tx: mpsc::Sender<Bytes>,
84    /// Drain timeout baked in at channel-creation time from the endpoint config.
85    pub(crate) drain_timeout: Duration,
86}
87
88/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
89///
90/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
91/// the channel inherits the endpoint's backpressure config.  This free
92/// function uses the compile-time defaults and exists for tests and pre-bind
93/// code paths.
94pub fn make_body_channel() -> (BodyWriter, BodyReader) {
95    make_body_channel_with(
96        DEFAULT_CHANNEL_CAPACITY,
97        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
98    )
99}
100
101fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
102    let (tx, rx) = mpsc::channel(capacity);
103    (
104        BodyWriter { tx, drain_timeout },
105        BodyReader {
106            rx: Arc::new(tokio::sync::Mutex::new(rx)),
107            cancel: Arc::new(tokio::sync::Notify::new()),
108        },
109    )
110}
111
112// ── Cancellable receive ───────────────────────────────────────────────────────
113
114/// Receive the next chunk from a body channel, aborting immediately if
115/// `cancel` is notified.
116///
117/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
118/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
119/// this helper so the cancellation semantics are defined and tested in one place.
120async fn recv_with_cancel(
121    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
122    cancel: Arc<tokio::sync::Notify>,
123) -> Option<Bytes> {
124    tokio::select! {
125        biased;
126        _ = cancel.notified() => None,
127        chunk = async { rx.lock().await.recv().await } => chunk,
128    }
129}
130
131impl BodyReader {
132    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
133    /// or when the reader has been cancelled.
134    pub async fn next_chunk(&self) -> Option<Bytes> {
135        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
136    }
137}
138
139impl BodyWriter {
140    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
141    /// the drain timeout expires (JS not reading fast enough).
142    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
143        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
144            .await
145            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
146            .map_err(|_| "body reader dropped".to_string())
147    }
148}
149
150// ── StoreConfig ───────────────────────────────────────────────────────────────
151
152/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
153#[derive(Debug, Clone)]
154pub struct StoreConfig {
155    /// Body-channel capacity (in chunks).  Minimum 1.
156    pub channel_capacity: usize,
157    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
158    pub max_chunk_size: usize,
159    /// Milliseconds to wait for a slow body reader before dropping.
160    pub drain_timeout: Duration,
161    /// Maximum handle slots per registry.  Prevents unbounded growth.
162    pub max_handles: usize,
163    /// TTL for handle entries; expired entries are swept periodically.
164    /// Zero disables sweeping.
165    pub ttl: Duration,
166}
167
168impl Default for StoreConfig {
169    fn default() -> Self {
170        Self {
171            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
172            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
173            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
174            max_handles: DEFAULT_MAX_HANDLES,
175            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
176        }
177    }
178}
179
180// ── Timed wrapper ─────────────────────────────────────────────────────────────
181
182struct Timed<T> {
183    value: T,
184    created_at: Instant,
185}
186
187impl<T> Timed<T> {
188    fn new(value: T) -> Self {
189        Self {
190            value,
191            created_at: Instant::now(),
192        }
193    }
194
195    fn is_expired(&self, ttl: Duration) -> bool {
196        self.created_at.elapsed() > ttl
197    }
198}
199
200/// Pending reader tracked with insertion time for TTL sweep.
201struct PendingReaderEntry {
202    reader: BodyReader,
203    created: Instant,
204}
205
206// ── HandleStore ───────────────────────────────────────────────────────────────
207
208/// Tracks handles inserted during a multi-handle allocation sequence.
209/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
210/// has been called. This prevents orphaned handles when a later insert fails.
211pub(crate) struct InsertGuard<'a> {
212    store: &'a HandleStore,
213    tracked: Vec<TrackedHandle>,
214    committed: bool,
215}
216
217/// A handle tracked by [`InsertGuard`] for rollback on drop.
218///
219/// # Intentionally omitted variants
220///
221/// `Session` and `FetchCancel` are not tracked here because their lifecycles
222/// are managed outside of multi-handle allocation sequences:
223/// - Sessions are created and closed by `session_connect` / `session_close`
224///   independently and are never allocated inside a guard.
225/// - Fetch cancel tokens are allocated before a guard is opened and are
226///   always cleaned up by `remove_fetch_token` after the fetch resolves.
227///
228/// If either type is ever added to a guard-guarded allocation path in the
229/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
230/// corresponding rollback arms in [`InsertGuard::drop`].
231enum TrackedHandle {
232    Reader(u64),
233    Writer(u64),
234    ReqHead(u64),
235}
236
237impl<'a> InsertGuard<'a> {
238    fn new(store: &'a HandleStore) -> Self {
239        Self {
240            store,
241            tracked: Vec::new(),
242            committed: false,
243        }
244    }
245
246    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
247        let h = self.store.insert_reader(reader)?;
248        self.tracked.push(TrackedHandle::Reader(h));
249        Ok(h)
250    }
251
252    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
253        let h = self.store.insert_writer(writer)?;
254        self.tracked.push(TrackedHandle::Writer(h));
255        Ok(h)
256    }
257
258    pub fn allocate_req_handle(
259        &mut self,
260        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
261    ) -> Result<u64, CoreError> {
262        let h = self.store.allocate_req_handle(sender)?;
263        self.tracked.push(TrackedHandle::ReqHead(h));
264        Ok(h)
265    }
266
267    /// Consume the guard without rolling back. Call after all inserts succeed.
268    pub fn commit(mut self) {
269        self.committed = true;
270    }
271}
272
273impl Drop for InsertGuard<'_> {
274    fn drop(&mut self) {
275        if self.committed {
276            return;
277        }
278        for handle in &self.tracked {
279            match handle {
280                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
281                TrackedHandle::Writer(h) => {
282                    let _ = self.store.finish_body(*h);
283                }
284                TrackedHandle::ReqHead(h) => {
285                    self.store
286                        .request_heads
287                        .lock()
288                        .unwrap_or_else(|e| e.into_inner())
289                        .remove(handle_to_request_head_key(*h));
290                }
291            }
292        }
293    }
294}
295
296/// Per-endpoint handle registry.  Owns all body readers, writers,
297/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
298/// a single `IrohEndpoint`.
299///
300/// When the endpoint is dropped, this store is dropped with it — all
301/// slot-maps are freed and any remaining handles become invalid.
302pub struct HandleStore {
303    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
304    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
305    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
306    request_heads:
307        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
308    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
309    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
310    pub(crate) config: StoreConfig,
311}
312
313impl HandleStore {
314    /// Create a new handle store with the given configuration.
315    pub fn new(config: StoreConfig) -> Self {
316        Self {
317            readers: Mutex::new(SlotMap::with_key()),
318            writers: Mutex::new(SlotMap::with_key()),
319            sessions: Mutex::new(SlotMap::with_key()),
320            request_heads: Mutex::new(SlotMap::with_key()),
321            fetch_cancels: Mutex::new(SlotMap::with_key()),
322            pending_readers: Mutex::new(HashMap::new()),
323            config,
324        }
325    }
326
327    // ── Config accessors ─────────────────────────────────────────────────
328
329    /// Create a guard for multi-handle allocation with automatic rollback.
330    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
331        InsertGuard::new(self)
332    }
333
334    /// The configured drain timeout.
335    pub fn drain_timeout(&self) -> Duration {
336        self.config.drain_timeout
337    }
338
339    /// The configured maximum chunk size.
340    pub fn max_chunk_size(&self) -> usize {
341        self.config.max_chunk_size
342    }
343
344    /// Snapshot of handle counts for observability.
345    ///
346    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
347    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
348        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
349        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
350        let sessions = self
351            .sessions
352            .lock()
353            .unwrap_or_else(|e| e.into_inner())
354            .len();
355        let total = readers
356            .saturating_add(writers)
357            .saturating_add(sessions)
358            .saturating_add(
359                self.request_heads
360                    .lock()
361                    .unwrap_or_else(|e| e.into_inner())
362                    .len(),
363            )
364            .saturating_add(
365                self.fetch_cancels
366                    .lock()
367                    .unwrap_or_else(|e| e.into_inner())
368                    .len(),
369            );
370        (readers, writers, sessions, total)
371    }
372
373    // ── Body channels ────────────────────────────────────────────────────
374
375    /// Create a matched (writer, reader) pair using this store's config.
376    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
377        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
378    }
379
380    // ── Capacity-checked insert ──────────────────────────────────────────
381
382    fn insert_checked<K: slotmap::Key, T>(
383        registry: &Mutex<SlotMap<K, Timed<T>>>,
384        value: T,
385        max: usize,
386    ) -> Result<u64, CoreError> {
387        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
388        if reg.len() >= max {
389            return Err(CoreError::internal("handle registry at capacity"));
390        }
391        let key = reg.insert(Timed::new(value));
392        Ok(key_to_handle(key))
393    }
394
395    // ── Body reader / writer ─────────────────────────────────────────────
396
397    /// Insert a `BodyReader` and return a handle.
398    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
399        Self::insert_checked(&self.readers, reader, self.config.max_handles)
400    }
401
402    /// Insert a `BodyWriter` and return a handle.
403    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
404        Self::insert_checked(&self.writers, writer, self.config.max_handles)
405    }
406
407    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
408    ///
409    /// The writer handle is returned to JS.  The reader must be stashed via
410    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
411    /// can claim it.
412    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
413        let (writer, reader) = self.make_body_channel();
414        let handle = self.insert_writer(writer)?;
415        Ok((handle, reader))
416    }
417
418    /// Store the reader side of a newly allocated writer channel so that the
419    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
420    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
421        self.pending_readers
422            .lock()
423            .unwrap_or_else(|e| e.into_inner())
424            .insert(
425                writer_handle,
426                PendingReaderEntry {
427                    reader,
428                    created: Instant::now(),
429                },
430            );
431    }
432
433    /// Claim the reader that was paired with `writer_handle`.
434    /// Returns `None` if already claimed or never stored.
435    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
436        self.pending_readers
437            .lock()
438            .unwrap_or_else(|e| e.into_inner())
439            .remove(&writer_handle)
440            .map(|e| e.reader)
441    }
442
443    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
444
445    /// Pull the next chunk from a reader handle.
446    ///
447    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
448    /// cleaned up from the registry automatically.
449    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
450        // Clone the Arc — allows awaiting without holding the registry mutex.
451        let (rx_arc, cancel) = {
452            let reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
453            let entry = reg
454                .get(handle_to_reader_key(handle))
455                .ok_or_else(|| CoreError::invalid_handle(handle))?;
456            (entry.value.rx.clone(), entry.value.cancel.clone())
457        };
458
459        let chunk = recv_with_cancel(rx_arc, cancel).await;
460
461        // Clean up on EOF so the slot is released promptly.
462        if chunk.is_none() {
463            self.readers
464                .lock()
465                .unwrap_or_else(|e| e.into_inner())
466                .remove(handle_to_reader_key(handle));
467        }
468
469        Ok(chunk)
470    }
471
472    /// Push a chunk into a writer handle.
473    ///
474    /// Chunks larger than the configured `max_chunk_size` are split
475    /// automatically so individual messages stay within the backpressure budget.
476    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
477        // Clone the Sender (cheap) and release the lock before awaiting.
478        let (tx, timeout) = {
479            let reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
480            let entry = reg
481                .get(handle_to_writer_key(handle))
482                .ok_or_else(|| CoreError::invalid_handle(handle))?;
483            (entry.value.tx.clone(), entry.value.drain_timeout)
484        };
485        let max = self.config.max_chunk_size;
486        if chunk.len() <= max {
487            tokio::time::timeout(timeout, tx.send(chunk))
488                .await
489                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
490                .map_err(|_| CoreError::internal("body reader dropped"))
491        } else {
492            // Split into max-size pieces.
493            let mut offset = 0;
494            while offset < chunk.len() {
495                let end = offset.saturating_add(max).min(chunk.len());
496                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
497                    .await
498                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
499                    .map_err(|_| CoreError::internal("body reader dropped"))?;
500                offset = end;
501            }
502            Ok(())
503        }
504    }
505
506    /// Signal end-of-body by dropping the writer from the registry.
507    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
508        self.writers
509            .lock()
510            .unwrap_or_else(|e| e.into_inner())
511            .remove(handle_to_writer_key(handle))
512            .ok_or_else(|| CoreError::invalid_handle(handle))?;
513        Ok(())
514    }
515
516    /// Drop a body reader, signalling cancellation of any in-flight read.
517    pub fn cancel_reader(&self, handle: u64) {
518        let entry = self
519            .readers
520            .lock()
521            .unwrap_or_else(|e| e.into_inner())
522            .remove(handle_to_reader_key(handle));
523        if let Some(e) = entry {
524            e.value.cancel.notify_waiters();
525        }
526    }
527
528    // ── Session ──────────────────────────────────────────────────────────
529
530    /// Insert a `SessionEntry` and return a handle.
531    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
532        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
533    }
534
535    /// Look up a session by handle without consuming it.
536    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
537        self.sessions
538            .lock()
539            .unwrap_or_else(|e| e.into_inner())
540            .get(handle_to_session_key(handle))
541            .map(|e| e.value.clone())
542    }
543
544    /// Remove a session entry by handle and return it.
545    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
546        self.sessions
547            .lock()
548            .unwrap_or_else(|e| e.into_inner())
549            .remove(handle_to_session_key(handle))
550            .map(|e| e.value)
551    }
552
553    // ── Request head (for server respond path) ───────────────────────────
554
555    /// Insert a response-head oneshot sender and return a handle.
556    pub fn allocate_req_handle(
557        &self,
558        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
559    ) -> Result<u64, CoreError> {
560        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
561    }
562
563    /// Remove and return the response-head sender for the given handle.
564    pub fn take_req_sender(
565        &self,
566        handle: u64,
567    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
568        self.request_heads
569            .lock()
570            .unwrap_or_else(|e| e.into_inner())
571            .remove(handle_to_request_head_key(handle))
572            .map(|e| e.value)
573    }
574
575    // ── Fetch cancel ─────────────────────────────────────────────────────
576
577    /// Allocate a cancellation token for an upcoming `fetch` call.
578    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
579        let notify = Arc::new(tokio::sync::Notify::new());
580        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
581    }
582
583    /// Signal an in-flight fetch to abort.
584    pub fn cancel_in_flight(&self, token: u64) {
585        if let Some(entry) = self
586            .fetch_cancels
587            .lock()
588            .unwrap_or_else(|e| e.into_inner())
589            .get(handle_to_fetch_cancel_key(token))
590        {
591            entry.value.notify_one();
592        }
593    }
594
595    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
596    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
597        self.fetch_cancels
598            .lock()
599            .unwrap_or_else(|e| e.into_inner())
600            .get(handle_to_fetch_cancel_key(token))
601            .map(|e| e.value.clone())
602    }
603
604    /// Remove a fetch cancel token after the fetch completes.
605    pub fn remove_fetch_token(&self, token: u64) {
606        self.fetch_cancels
607            .lock()
608            .unwrap_or_else(|e| e.into_inner())
609            .remove(handle_to_fetch_cancel_key(token));
610    }
611
612    // ── TTL sweep ────────────────────────────────────────────────────────
613
614    /// Sweep all registries, removing entries older than `ttl`.
615    /// Also compacts any registry that is empty after sweeping to reclaim
616    /// the backing memory from traffic bursts.
617    pub fn sweep(&self, ttl: Duration) {
618        Self::sweep_registry(&self.readers, ttl);
619        Self::sweep_registry(&self.writers, ttl);
620        Self::sweep_registry(&self.request_heads, ttl);
621        Self::sweep_registry(&self.sessions, ttl);
622        Self::sweep_registry(&self.fetch_cancels, ttl);
623        self.sweep_pending_readers(ttl);
624    }
625
626    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
627        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
628        let expired: Vec<K> = reg
629            .iter()
630            .filter(|(_, e)| e.is_expired(ttl))
631            .map(|(k, _)| k)
632            .collect();
633
634        if expired.is_empty() {
635            return;
636        }
637
638        for key in &expired {
639            reg.remove(*key);
640        }
641        tracing::debug!(
642            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
643            expired.len()
644        );
645        // Compact when empty to reclaim backing memory after traffic bursts.
646        if reg.is_empty() && reg.capacity() > 128 {
647            *reg = SlotMap::with_key();
648        }
649    }
650
651    fn sweep_pending_readers(&self, ttl: Duration) {
652        let mut map = self
653            .pending_readers
654            .lock()
655            .unwrap_or_else(|e| e.into_inner());
656        let before = map.len();
657        map.retain(|_, e| e.created.elapsed() < ttl);
658        let removed = before.saturating_sub(map.len());
659        if removed > 0 {
660            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
661        }
662    }
663}
664
665// ── Shared pump helpers ───────────────────────────────────────────────────────
666
667/// Default read buffer size for QUIC stream reads.
668pub(crate) const PUMP_READ_BUF: usize = 64 * 1024;
669
670/// Pump raw bytes from a QUIC `RecvStream` into a `BodyWriter`.
671///
672/// Reads `PUMP_READ_BUF`-sized chunks and forwards them through the body
673/// channel.  Stops when the stream ends or the writer is dropped.
674pub(crate) async fn pump_quic_recv_to_body(
675    mut recv: iroh::endpoint::RecvStream,
676    writer: BodyWriter,
677) {
678    while let Ok(Some(chunk)) = recv.read_chunk(PUMP_READ_BUF).await {
679        if writer.send_chunk(chunk.bytes).await.is_err() {
680            break;
681        }
682    }
683    // writer drops → BodyReader sees EOF.
684}
685
686/// Pump raw bytes from a `BodyReader` into a QUIC `SendStream`.
687///
688/// Reads chunks from the body channel and writes them to the stream.
689/// Finishes the stream when the reader reaches EOF.
690pub(crate) async fn pump_body_to_quic_send(
691    reader: BodyReader,
692    mut send: iroh::endpoint::SendStream,
693) {
694    loop {
695        match reader.next_chunk().await {
696            None => break,
697            Some(data) => {
698                if send.write_all(&data).await.is_err() {
699                    break;
700                }
701            }
702        }
703    }
704    let _ = send.finish();
705}
706
707/// Bidirectional pump between a byte-level I/O object and a pair of body channels.
708///
709/// Reads from `io` → sends to `writer` (incoming data).
710/// Reads from `reader` → writes to `io` (outgoing data).
711///
712/// Used for both client-side and server-side duplex upgrade pumps.
713pub(crate) async fn pump_duplex<IO>(io: IO, writer: BodyWriter, reader: BodyReader)
714where
715    IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
716{
717    let (mut recv, mut send) = tokio::io::split(io);
718
719    tokio::join!(
720        async {
721            use bytes::BytesMut;
722            use tokio::io::AsyncReadExt;
723            let mut buf = BytesMut::with_capacity(PUMP_READ_BUF);
724            loop {
725                buf.clear();
726                match recv.read_buf(&mut buf).await {
727                    Ok(0) | Err(_) => break,
728                    Ok(_) => {
729                        if writer.send_chunk(buf.split().freeze()).await.is_err() {
730                            break;
731                        }
732                    }
733                }
734            }
735        },
736        async {
737            use tokio::io::AsyncWriteExt;
738            loop {
739                match reader.next_chunk().await {
740                    None => break,
741                    Some(data) => {
742                        if send.write_all(&data).await.is_err() {
743                            break;
744                        }
745                    }
746                }
747            }
748            let _ = send.shutdown().await;
749        },
750    );
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756
757    fn test_store() -> HandleStore {
758        HandleStore::new(StoreConfig::default())
759    }
760
761    // ── Body channel basics ─────────────────────────────────────────────
762
763    #[tokio::test]
764    async fn body_channel_send_recv() {
765        let (writer, reader) = make_body_channel();
766        writer.send_chunk(Bytes::from("hello")).await.unwrap();
767        drop(writer); // signal EOF
768        let chunk = reader.next_chunk().await;
769        assert_eq!(chunk, Some(Bytes::from("hello")));
770        let eof = reader.next_chunk().await;
771        assert!(eof.is_none());
772    }
773
774    #[tokio::test]
775    async fn body_channel_multiple_chunks() {
776        let (writer, reader) = make_body_channel();
777        writer.send_chunk(Bytes::from("a")).await.unwrap();
778        writer.send_chunk(Bytes::from("b")).await.unwrap();
779        writer.send_chunk(Bytes::from("c")).await.unwrap();
780        drop(writer);
781
782        let mut collected = Vec::new();
783        while let Some(chunk) = reader.next_chunk().await {
784            collected.push(chunk);
785        }
786        assert_eq!(
787            collected,
788            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
789        );
790    }
791
792    #[tokio::test]
793    async fn body_channel_reader_dropped_returns_error() {
794        let (writer, reader) = make_body_channel();
795        drop(reader);
796        let result = writer.send_chunk(Bytes::from("data")).await;
797        assert!(result.is_err());
798    }
799
800    // ── HandleStore operations ──────────────────────────────────────────
801
802    #[tokio::test]
803    async fn insert_reader_and_next_chunk() {
804        let store = test_store();
805        let (writer, reader) = store.make_body_channel();
806        let handle = store.insert_reader(reader).unwrap();
807
808        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
809        drop(writer);
810
811        let chunk = store.next_chunk(handle).await.unwrap();
812        assert_eq!(chunk, Some(Bytes::from("slab-data")));
813
814        // EOF cleans up the registry entry
815        let eof = store.next_chunk(handle).await.unwrap();
816        assert!(eof.is_none());
817    }
818
819    #[tokio::test]
820    async fn next_chunk_invalid_handle() {
821        let store = test_store();
822        let result = store.next_chunk(999999).await;
823        assert!(result.is_err());
824        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
825    }
826
827    #[tokio::test]
828    async fn send_chunk_via_handle() {
829        let store = test_store();
830        let (writer, reader) = store.make_body_channel();
831        let handle = store.insert_writer(writer).unwrap();
832
833        store
834            .send_chunk(handle, Bytes::from("via-slab"))
835            .await
836            .unwrap();
837        store.finish_body(handle).unwrap();
838
839        let chunk = reader.next_chunk().await;
840        assert_eq!(chunk, Some(Bytes::from("via-slab")));
841        let eof = reader.next_chunk().await;
842        assert!(eof.is_none());
843    }
844
845    #[tokio::test]
846    async fn capacity_cap_rejects_overflow() {
847        let store = HandleStore::new(StoreConfig {
848            max_handles: 2,
849            ..StoreConfig::default()
850        });
851        let (_, r1) = store.make_body_channel();
852        let (_, r2) = store.make_body_channel();
853        let (_, r3) = store.make_body_channel();
854        store.insert_reader(r1).unwrap();
855        store.insert_reader(r2).unwrap();
856        let err = store.insert_reader(r3).unwrap_err();
857        assert!(err.message.contains("capacity"));
858    }
859
860    // ── #84 regression: recv_with_cancel cancellation ──────────────────
861
862    #[tokio::test]
863    async fn recv_with_cancel_returns_none_on_cancel() {
864        let (_tx, rx) = mpsc::channel::<Bytes>(4);
865        let rx = Arc::new(tokio::sync::Mutex::new(rx));
866        let cancel = Arc::new(tokio::sync::Notify::new());
867        // Notify before polling — biased select must return None immediately.
868        cancel.notify_one();
869        let result = recv_with_cancel(rx, cancel).await;
870        assert!(result.is_none());
871    }
872}