Skip to main content

iroh_http_core/ffi/
handles.rs

1//! Per-endpoint handle store and body channel types.
2//!
3//! Rust owns all stream state; JS holds only opaque `u64` handles.
4//! Each `IrohEndpoint` has its own `HandleStore` — no process-global registries.
5//! Handles are `u64` values equal to `key.data().as_ffi()`, unique within the
6//! owning endpoint's slot-map.
7// This module IS the definition site of the disallowed types — allow is correct here.
8#![allow(clippy::disallowed_types)]
9
10use std::{
11    collections::HashMap,
12    future::Future,
13    pin::Pin,
14    sync::{Arc, Mutex},
15    task::{Context, Poll},
16    time::{Duration, Instant},
17};
18
19use bytes::Bytes;
20use futures::sink::Sink;
21use http_body::Frame;
22use slotmap::{KeyData, SlotMap};
23use tokio::sync::mpsc;
24
25use crate::{http::body::BoxError, CoreError};
26
27// ── Constants ─────────────────────────────────────────────────────────────────
28
29pub const DEFAULT_CHANNEL_CAPACITY: usize = 32;
30pub const DEFAULT_MAX_CHUNK_SIZE: usize = 64 * 1024; // 64 KB
31pub const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 30_000; // 30 s
32pub const DEFAULT_SLAB_TTL_MS: u64 = 300_000; // 5 min
33pub const DEFAULT_SWEEP_INTERVAL_MS: u64 = 60_000; // 60 s
34pub const DEFAULT_MAX_HANDLES: usize = 65_536;
35
36// ── Resource types ────────────────────────────────────────────────────────────
37
38pub struct SessionEntry {
39    pub conn: iroh::endpoint::Connection,
40}
41
42pub struct ResponseHeadEntry {
43    pub status: u16,
44    pub headers: Vec<(String, String)>,
45}
46
47// ── SlotMap key types ─────────────────────────────────────────────────────────
48
49slotmap::new_key_type! { pub(crate) struct ReaderKey; }
50slotmap::new_key_type! { pub(crate) struct WriterKey; }
51slotmap::new_key_type! { pub(crate) struct FetchCancelKey; }
52slotmap::new_key_type! { pub(crate) struct SessionKey; }
53slotmap::new_key_type! { pub(crate) struct RequestHeadKey; }
54
55// ── Handle encode / decode helpers ───────────────────────────────────────────
56
57fn key_to_handle<K: slotmap::Key>(k: K) -> u64 {
58    k.data().as_ffi()
59}
60
61macro_rules! handle_to_key {
62    ($fn_name:ident, $key_type:ty) => {
63        fn $fn_name(h: u64) -> $key_type {
64            <$key_type>::from(KeyData::from_ffi(h))
65        }
66    };
67}
68
69handle_to_key!(handle_to_reader_key, ReaderKey);
70handle_to_key!(handle_to_writer_key, WriterKey);
71handle_to_key!(handle_to_session_key, SessionKey);
72handle_to_key!(handle_to_request_head_key, RequestHeadKey);
73handle_to_key!(handle_to_fetch_cancel_key, FetchCancelKey);
74
75// ── Body channel primitives ───────────────────────────────────────────────────
76
77/// Consumer end — stored in the reader registry.
78/// Uses `tokio::sync::Mutex` so we can `.await` the receiver without holding
79/// the registry's `std::sync::Mutex`.
80///
81/// Per ADR-014 D4 this type implements [`http_body::Body`] directly so it can
82/// be wrapped into [`crate::Body`] without an intermediate `StreamBody`
83/// adapter. The two consumer paths are disjoint:
84///
85/// - **Internal hyper path** — the `Body` impl drives `poll_frame`. The
86///   [`BodyReader`] is moved into [`crate::Body::new`] and never registered
87///   in the FFI handle store.
88/// - **FFI path** — JS calls `next_chunk(handle)` via [`HandleStore`]; the
89///   [`Body`](http_body::Body) impl is never polled.
90pub struct BodyReader {
91    pub(crate) rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
92    /// ISS-010: cancellation signal — notified when `cancel_reader` is called
93    /// so in-flight `next_chunk` awaits terminate promptly.
94    pub(crate) cancel: Arc<tokio::sync::Notify>,
95    /// In-flight recv future for the [`http_body::Body`] poll path. `None`
96    /// when no poll is outstanding. mpsc::recv is cancellation-safe so it is
97    /// safe to recreate this future after a `Pending` drop. `Send + Sync`
98    /// preserves `BodyReader: Sync` (required by the channel-based pump
99    /// helpers that take `&BodyReader` across `.await`).
100    pending: Option<Pin<Box<dyn Future<Output = Option<Bytes>> + Send + Sync>>>,
101}
102
103/// Producer end — stored in the writer registry.
104/// `mpsc::Sender` is `Clone`, so we clone it out of the registry for each call.
105///
106/// Per ADR-014 D4 this type implements [`futures::Sink<Bytes>`] so producers
107/// can pipe a `Stream<Item = Result<Bytes, _>>` into it via
108/// [`futures::StreamExt::forward`] without a hand-rolled pump loop. The two
109/// surfaces are disjoint:
110///
111/// - **Direct path** — call [`BodyWriter::send_chunk`] (`&self`, awaitable).
112///   Used by the FFI handle store and the limited hyper-body pump where we
113///   need byte-limit / frame-timeout policy that no stock adapter encodes.
114/// - **Sink path** — `.forward(writer)` from a `Stream`. The Sink owns a
115///   single in-flight send future at a time, cloned from `tx` so the
116///   underlying mpsc semantics (backpressure, drop-on-reader-gone, drain
117///   timeout) match `send_chunk` byte-for-byte.
118pub struct BodyWriter {
119    pub(crate) tx: mpsc::Sender<Bytes>,
120    /// Drain timeout baked in at channel-creation time from the endpoint config.
121    pub(crate) drain_timeout: Duration,
122    /// In-flight `send_chunk` future driven by the [`Sink`] impl. `None`
123    /// when `start_send` has not been called since the last completion.
124    /// Owns a clone of `tx` so the future is `'static`.
125    sending: Option<BodyWriterSendFuture>,
126}
127
128/// Boxed in-flight `send_chunk` future used by the [`Sink<Bytes>`] impl on
129/// [`BodyWriter`]. Aliased to satisfy `clippy::type_complexity`.
130type BodyWriterSendFuture = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + Sync>>;
131
132/// Create a matched (writer, reader) pair backed by a bounded mpsc channel.
133///
134/// Prefer [`HandleStore::make_body_channel`] when an endpoint is available so
135/// the channel inherits the endpoint's backpressure config.  This free
136/// function uses the compile-time defaults and exists for tests and pre-bind
137/// code paths.
138/// Free-function form of [`HandleStore::make_body_channel`] used by tests.
139pub fn make_body_channel() -> (BodyWriter, BodyReader) {
140    make_body_channel_with(
141        DEFAULT_CHANNEL_CAPACITY,
142        Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
143    )
144}
145
146fn make_body_channel_with(capacity: usize, drain_timeout: Duration) -> (BodyWriter, BodyReader) {
147    let (tx, rx) = mpsc::channel(capacity);
148    (
149        BodyWriter {
150            tx,
151            drain_timeout,
152            sending: None,
153        },
154        BodyReader {
155            rx: Arc::new(tokio::sync::Mutex::new(rx)),
156            cancel: Arc::new(tokio::sync::Notify::new()),
157            pending: None,
158        },
159    )
160}
161
162// ── Cancellable receive ───────────────────────────────────────────────────────
163
164/// Receive the next chunk from a body channel, aborting immediately if
165/// `cancel` is notified.
166///
167/// Returns `None` on EOF (sender dropped) or on cancellation.  Both call
168/// sites — [`BodyReader::next_chunk`] and [`HandleStore::next_chunk`] — share
169/// this helper so the cancellation semantics are defined and tested in one place.
170async fn recv_with_cancel(
171    rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Bytes>>>,
172    cancel: Arc<tokio::sync::Notify>,
173) -> Option<Bytes> {
174    tokio::select! {
175        biased;
176        _ = cancel.notified() => None,
177        chunk = async { rx.lock().await.recv().await } => chunk,
178    }
179}
180
181impl BodyReader {
182    /// Receive the next chunk.  Returns `None` when the writer is gone (EOF)
183    /// or when the reader has been cancelled.
184    pub async fn next_chunk(&self) -> Option<Bytes> {
185        recv_with_cancel(self.rx.clone(), self.cancel.clone()).await
186    }
187}
188
189/// ADR-014 D4: `BodyReader` is itself an [`http_body::Body`] so callers can
190/// wrap it in [`crate::Body::new`] without a `StreamBody`/`unfold` adapter.
191impl http_body::Body for BodyReader {
192    type Data = Bytes;
193    type Error = std::convert::Infallible;
194
195    fn poll_frame(
196        self: Pin<&mut Self>,
197        cx: &mut Context<'_>,
198    ) -> Poll<Option<Result<Frame<Bytes>, Self::Error>>> {
199        let this = self.get_mut();
200        let fut = this.pending.get_or_insert_with(|| {
201            Box::pin(recv_with_cancel(this.rx.clone(), this.cancel.clone()))
202        });
203        match fut.as_mut().poll(cx) {
204            Poll::Pending => Poll::Pending,
205            Poll::Ready(opt) => {
206                this.pending = None;
207                Poll::Ready(opt.map(|data| Ok(Frame::data(data))))
208            }
209        }
210    }
211}
212
213impl BodyWriter {
214    /// Send one chunk.  Returns `Err` if the reader has been dropped or if
215    /// the drain timeout expires (JS not reading fast enough).
216    pub async fn send_chunk(&self, chunk: Bytes) -> Result<(), String> {
217        tokio::time::timeout(self.drain_timeout, self.tx.send(chunk))
218            .await
219            .map_err(|_| "drain timeout: body reader is too slow".to_string())?
220            .map_err(|_| "body reader dropped".to_string())
221    }
222}
223
224/// ADR-014 D4: `BodyWriter` is a [`Sink<Bytes>`] so producers can pipe a
225/// stream of chunks through `forward` instead of hand-rolling a pump loop.
226/// The Sink shares the underlying mpsc channel with [`BodyWriter::send_chunk`]
227/// and applies the same drain-timeout / reader-dropped semantics.
228impl Sink<Bytes> for BodyWriter {
229    type Error = BoxError;
230
231    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
232        let this = self.get_mut();
233        match this.sending.as_mut() {
234            None => Poll::Ready(Ok(())),
235            Some(fut) => match fut.as_mut().poll(cx) {
236                Poll::Pending => Poll::Pending,
237                Poll::Ready(res) => {
238                    this.sending = None;
239                    Poll::Ready(res)
240                }
241            },
242        }
243    }
244
245    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
246        // Sink contract requires poll_ready -> Ready(Ok) before start_send,
247        // so `sending` is None here. Build a fresh future that owns its own
248        // tx clone so it is 'static + Send + Sync.
249        debug_assert!(
250            self.sending.is_none(),
251            "Sink contract: poll_ready must return Ready(Ok) before start_send"
252        );
253        let tx = self.tx.clone();
254        let drain_timeout = self.drain_timeout;
255        self.get_mut().sending = Some(Box::pin(async move {
256            tokio::time::timeout(drain_timeout, tx.send(item))
257                .await
258                .map_err(|_| -> BoxError { "drain timeout: body reader is too slow".into() })?
259                .map_err(|_| -> BoxError { "body reader dropped".into() })
260        }));
261        Ok(())
262    }
263
264    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
265        // Same as poll_ready: drain any in-flight send, then return Ready.
266        self.poll_ready(cx)
267    }
268
269    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
270        // Flush any in-flight send. EOF is signalled by dropping the
271        // BodyWriter (which drops `tx`); callers using `forward(writer)` own
272        // the writer and drop it on completion.
273        self.poll_flush(cx)
274    }
275}
276
277// ── StoreConfig ───────────────────────────────────────────────────────────────
278
279/// Configuration for a [`HandleStore`].  Set once at endpoint bind time.
280#[derive(Debug, Clone)]
281pub struct StoreConfig {
282    /// Body-channel capacity (in chunks).  Minimum 1.
283    pub channel_capacity: usize,
284    /// Maximum byte length of a single chunk in `send_chunk`.  Minimum 1.
285    pub max_chunk_size: usize,
286    /// Milliseconds to wait for a slow body reader before dropping.
287    pub drain_timeout: Duration,
288    /// Maximum handle slots per registry.  Prevents unbounded growth.
289    pub max_handles: usize,
290    /// TTL for handle entries; expired entries are swept periodically.
291    /// Zero disables sweeping.
292    pub ttl: Duration,
293}
294
295impl Default for StoreConfig {
296    fn default() -> Self {
297        Self {
298            channel_capacity: DEFAULT_CHANNEL_CAPACITY,
299            max_chunk_size: DEFAULT_MAX_CHUNK_SIZE,
300            drain_timeout: Duration::from_millis(DEFAULT_DRAIN_TIMEOUT_MS),
301            max_handles: DEFAULT_MAX_HANDLES,
302            ttl: Duration::from_millis(DEFAULT_SLAB_TTL_MS),
303        }
304    }
305}
306
307// ── Timed wrapper ─────────────────────────────────────────────────────────────
308
309struct Timed<T> {
310    value: T,
311    /// Updated on every access so that actively-used handles are not TTL-swept
312    /// mid-transfer (fix for iroh-http#119 Bug 3).
313    last_accessed: Instant,
314}
315
316impl<T> Timed<T> {
317    fn new(value: T) -> Self {
318        Self {
319            value,
320            last_accessed: Instant::now(),
321        }
322    }
323
324    /// Refresh the last-access timestamp.  Call inside the registry lock.
325    fn touch(&mut self) {
326        self.last_accessed = Instant::now();
327    }
328
329    fn is_expired(&self, ttl: Duration) -> bool {
330        self.last_accessed.elapsed() > ttl
331    }
332}
333
334/// Pending reader tracked with insertion time for TTL sweep.
335struct PendingReaderEntry {
336    reader: BodyReader,
337    created: Instant,
338}
339
340// ── HandleStore ───────────────────────────────────────────────────────────────
341
342/// Tracks handles inserted during a multi-handle allocation sequence.
343/// On drop, removes all tracked handles unless [`commit`](InsertGuard::commit)
344/// has been called. This prevents orphaned handles when a later insert fails.
345pub(crate) struct InsertGuard<'a> {
346    store: &'a HandleStore,
347    tracked: Vec<TrackedHandle>,
348    committed: bool,
349}
350
351/// A handle tracked by [`InsertGuard`] for rollback on drop.
352///
353/// # Intentionally omitted variants
354///
355/// `Session` and `FetchCancel` are not tracked here because their lifecycles
356/// are managed outside of multi-handle allocation sequences:
357/// - Sessions are created and closed by `Session::connect` / `Session::close`
358///   independently and are never allocated inside a guard.
359/// - Fetch cancel tokens are allocated before a guard is opened and are
360///   always cleaned up by `remove_fetch_token` after the fetch resolves.
361///
362/// If either type is ever added to a guard-guarded allocation path in the
363/// future, add `Session(u64)` or `FetchCancel(u64)` variants here with the
364/// corresponding rollback arms in [`InsertGuard::drop`].
365enum TrackedHandle {
366    Reader(u64),
367    Writer(u64),
368    ReqHead(u64),
369}
370
371impl<'a> InsertGuard<'a> {
372    fn new(store: &'a HandleStore) -> Self {
373        Self {
374            store,
375            tracked: Vec::new(),
376            committed: false,
377        }
378    }
379
380    pub fn insert_reader(&mut self, reader: BodyReader) -> Result<u64, CoreError> {
381        let h = self.store.insert_reader(reader)?;
382        self.tracked.push(TrackedHandle::Reader(h));
383        Ok(h)
384    }
385
386    pub fn insert_writer(&mut self, writer: BodyWriter) -> Result<u64, CoreError> {
387        let h = self.store.insert_writer(writer)?;
388        self.tracked.push(TrackedHandle::Writer(h));
389        Ok(h)
390    }
391
392    pub fn allocate_req_handle(
393        &mut self,
394        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
395    ) -> Result<u64, CoreError> {
396        let h = self.store.allocate_req_handle(sender)?;
397        self.tracked.push(TrackedHandle::ReqHead(h));
398        Ok(h)
399    }
400
401    /// Consume the guard without rolling back. Call after all inserts succeed.
402    pub fn commit(mut self) {
403        self.committed = true;
404    }
405}
406
407impl Drop for InsertGuard<'_> {
408    fn drop(&mut self) {
409        if self.committed {
410            return;
411        }
412        for handle in &self.tracked {
413            match handle {
414                TrackedHandle::Reader(h) => self.store.cancel_reader(*h),
415                TrackedHandle::Writer(h) => {
416                    let _ = self.store.finish_body(*h);
417                }
418                TrackedHandle::ReqHead(h) => {
419                    self.store
420                        .request_heads
421                        .lock()
422                        .unwrap_or_else(|e| e.into_inner())
423                        .remove(handle_to_request_head_key(*h));
424                }
425            }
426        }
427    }
428}
429
430/// Per-endpoint handle registry.  Owns all body readers, writers,
431/// sessions, request-head rendezvous channels, and fetch-cancel tokens for
432/// a single `IrohEndpoint`.
433///
434/// When the endpoint is dropped, this store is dropped with it — all
435/// slot-maps are freed and any remaining handles become invalid.
436pub struct HandleStore {
437    readers: Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>,
438    writers: Mutex<SlotMap<WriterKey, Timed<BodyWriter>>>,
439    sessions: Mutex<SlotMap<SessionKey, Timed<Arc<SessionEntry>>>>,
440    request_heads:
441        Mutex<SlotMap<RequestHeadKey, Timed<tokio::sync::oneshot::Sender<ResponseHeadEntry>>>>,
442    fetch_cancels: Mutex<SlotMap<FetchCancelKey, Timed<Arc<tokio::sync::Notify>>>>,
443    pending_readers: Mutex<HashMap<u64, PendingReaderEntry>>,
444    pub(crate) config: StoreConfig,
445}
446
447impl HandleStore {
448    /// Create a new handle store with the given configuration.
449    pub fn new(config: StoreConfig) -> Self {
450        Self {
451            readers: Mutex::new(SlotMap::with_key()),
452            writers: Mutex::new(SlotMap::with_key()),
453            sessions: Mutex::new(SlotMap::with_key()),
454            request_heads: Mutex::new(SlotMap::with_key()),
455            fetch_cancels: Mutex::new(SlotMap::with_key()),
456            pending_readers: Mutex::new(HashMap::new()),
457            config,
458        }
459    }
460
461    // ── Config accessors ─────────────────────────────────────────────────
462
463    /// Create a guard for multi-handle allocation with automatic rollback.
464    pub(crate) fn insert_guard(&self) -> InsertGuard<'_> {
465        InsertGuard::new(self)
466    }
467
468    /// The configured drain timeout.
469    pub fn drain_timeout(&self) -> Duration {
470        self.config.drain_timeout
471    }
472
473    /// The configured maximum chunk size.
474    pub fn max_chunk_size(&self) -> usize {
475        self.config.max_chunk_size
476    }
477
478    /// Snapshot of handle counts for observability.
479    ///
480    /// Returns `(active_readers, active_writers, active_sessions, total_handles)`.
481    pub fn count_handles(&self) -> (usize, usize, usize, usize) {
482        let readers = self.readers.lock().unwrap_or_else(|e| e.into_inner()).len();
483        let writers = self.writers.lock().unwrap_or_else(|e| e.into_inner()).len();
484        let sessions = self
485            .sessions
486            .lock()
487            .unwrap_or_else(|e| e.into_inner())
488            .len();
489        let total = readers
490            .saturating_add(writers)
491            .saturating_add(sessions)
492            .saturating_add(
493                self.request_heads
494                    .lock()
495                    .unwrap_or_else(|e| e.into_inner())
496                    .len(),
497            )
498            .saturating_add(
499                self.fetch_cancels
500                    .lock()
501                    .unwrap_or_else(|e| e.into_inner())
502                    .len(),
503            );
504        (readers, writers, sessions, total)
505    }
506
507    // ── Body channels ────────────────────────────────────────────────────
508
509    /// Create a matched (writer, reader) pair using this store's config.
510    pub fn make_body_channel(&self) -> (BodyWriter, BodyReader) {
511        make_body_channel_with(self.config.channel_capacity, self.config.drain_timeout)
512    }
513
514    // ── Capacity-checked insert ──────────────────────────────────────────
515
516    fn insert_checked<K: slotmap::Key, T>(
517        registry: &Mutex<SlotMap<K, Timed<T>>>,
518        value: T,
519        max: usize,
520    ) -> Result<u64, CoreError> {
521        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
522        if reg.len() >= max {
523            return Err(CoreError::internal("handle registry at capacity"));
524        }
525        let key = reg.insert(Timed::new(value));
526        Ok(key_to_handle(key))
527    }
528
529    // ── Body reader / writer ─────────────────────────────────────────────
530
531    /// Insert a `BodyReader` and return a handle.
532    pub fn insert_reader(&self, reader: BodyReader) -> Result<u64, CoreError> {
533        Self::insert_checked(&self.readers, reader, self.config.max_handles)
534    }
535
536    /// Insert a `BodyWriter` and return a handle.
537    pub fn insert_writer(&self, writer: BodyWriter) -> Result<u64, CoreError> {
538        Self::insert_checked(&self.writers, writer, self.config.max_handles)
539    }
540
541    /// Allocate a `(writer_handle, reader)` pair for streaming request bodies.
542    ///
543    /// The writer handle is returned to JS.  The reader must be stashed via
544    /// [`store_pending_reader`](Self::store_pending_reader) so the fetch path
545    /// can claim it.
546    pub fn alloc_body_writer(&self) -> Result<(u64, BodyReader), CoreError> {
547        let (writer, reader) = self.make_body_channel();
548        let handle = self.insert_writer(writer)?;
549        Ok((handle, reader))
550    }
551
552    /// Store the reader side of a newly allocated writer channel so that the
553    /// fetch path can claim it with [`claim_pending_reader`](Self::claim_pending_reader).
554    pub fn store_pending_reader(&self, writer_handle: u64, reader: BodyReader) {
555        self.pending_readers
556            .lock()
557            .unwrap_or_else(|e| e.into_inner())
558            .insert(
559                writer_handle,
560                PendingReaderEntry {
561                    reader,
562                    created: Instant::now(),
563                },
564            );
565    }
566
567    /// Claim the reader that was paired with `writer_handle`.
568    /// Returns `None` if already claimed or never stored.
569    pub fn claim_pending_reader(&self, writer_handle: u64) -> Option<BodyReader> {
570        self.pending_readers
571            .lock()
572            .unwrap_or_else(|e| e.into_inner())
573            .remove(&writer_handle)
574            .map(|e| e.reader)
575    }
576
577    // ── Bridge methods (nextChunk / sendChunk / finishBody) ──────────────
578
579    /// Pull the next chunk from a reader handle.
580    ///
581    /// Returns `Ok(None)` at EOF.  After returning `None` the handle is
582    /// cleaned up from the registry automatically.
583    pub async fn next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
584        // Clone the Arc — allows awaiting without holding the registry mutex.
585        let (rx_arc, cancel) = {
586            let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
587            let entry = reg
588                .get_mut(handle_to_reader_key(handle))
589                .ok_or_else(|| CoreError::invalid_handle(handle))?;
590            entry.touch();
591            (entry.value.rx.clone(), entry.value.cancel.clone())
592        };
593
594        let chunk = recv_with_cancel(rx_arc, cancel).await;
595
596        // Clean up on EOF so the slot is released promptly.
597        if chunk.is_none() {
598            self.readers
599                .lock()
600                .unwrap_or_else(|e| e.into_inner())
601                .remove(handle_to_reader_key(handle));
602        }
603
604        Ok(chunk)
605    }
606
607    /// Non-blocking variant of [`next_chunk`](Self::next_chunk).
608    ///
609    /// Returns:
610    /// - `Ok(Some(bytes))` — a chunk was immediately available.
611    /// - `Ok(None)` — EOF; the reader is cleaned up.
612    /// - `Err(_)` — no data available yet (channel empty or lock contended),
613    ///   or invalid handle. Caller should retry after yielding.
614    ///
615    /// #126: Used by the Deno adapter to avoid `spawn_blocking` overhead on
616    /// the body-read hot path.  When data is already buffered in the channel,
617    /// this returns it synchronously on the JS thread.
618    pub fn try_next_chunk(&self, handle: u64) -> Result<Option<Bytes>, CoreError> {
619        let rx_arc = {
620            let mut reg = self.readers.lock().unwrap_or_else(|e| e.into_inner());
621            let entry = reg
622                .get_mut(handle_to_reader_key(handle))
623                .ok_or_else(|| CoreError::invalid_handle(handle))?;
624            entry.touch();
625            entry.value.rx.clone()
626        };
627
628        // Try to acquire the tokio mutex without blocking.
629        let mut rx_guard = match rx_arc.try_lock() {
630            Ok(g) => g,
631            Err(_) => return Err(CoreError::internal("try_next_chunk: lock contended")),
632        };
633
634        match rx_guard.try_recv() {
635            Ok(chunk) => Ok(Some(chunk)),
636            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
637                Err(CoreError::internal("try_next_chunk: channel empty"))
638            }
639            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
640                // EOF — clean up the reader.
641                drop(rx_guard);
642                self.readers
643                    .lock()
644                    .unwrap_or_else(|e| e.into_inner())
645                    .remove(handle_to_reader_key(handle));
646                Ok(None)
647            }
648        }
649    }
650
651    /// Push a chunk into a writer handle.
652    ///
653    /// Chunks larger than the configured `max_chunk_size` are split
654    /// automatically so individual messages stay within the backpressure budget.
655    pub async fn send_chunk(&self, handle: u64, chunk: Bytes) -> Result<(), CoreError> {
656        // Clone the Sender (cheap) and release the lock before awaiting.
657        let (tx, timeout) = {
658            let mut reg = self.writers.lock().unwrap_or_else(|e| e.into_inner());
659            let entry = reg
660                .get_mut(handle_to_writer_key(handle))
661                .ok_or_else(|| CoreError::invalid_handle(handle))?;
662            entry.touch();
663            (entry.value.tx.clone(), entry.value.drain_timeout)
664        };
665        let max = self.config.max_chunk_size;
666        if chunk.len() <= max {
667            tokio::time::timeout(timeout, tx.send(chunk))
668                .await
669                .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
670                .map_err(|_| CoreError::internal("body reader dropped"))
671        } else {
672            // Split into max-size pieces.
673            let mut offset = 0;
674            while offset < chunk.len() {
675                let end = offset.saturating_add(max).min(chunk.len());
676                tokio::time::timeout(timeout, tx.send(chunk.slice(offset..end)))
677                    .await
678                    .map_err(|_| CoreError::timeout("drain timeout: body reader is too slow"))?
679                    .map_err(|_| CoreError::internal("body reader dropped"))?;
680                offset = end;
681            }
682            Ok(())
683        }
684    }
685
686    /// Signal end-of-body by dropping the writer from the registry.
687    pub fn finish_body(&self, handle: u64) -> Result<(), CoreError> {
688        self.writers
689            .lock()
690            .unwrap_or_else(|e| e.into_inner())
691            .remove(handle_to_writer_key(handle))
692            .ok_or_else(|| CoreError::invalid_handle(handle))?;
693        Ok(())
694    }
695
696    /// Drop a body reader, signalling cancellation of any in-flight read.
697    pub fn cancel_reader(&self, handle: u64) {
698        let entry = self
699            .readers
700            .lock()
701            .unwrap_or_else(|e| e.into_inner())
702            .remove(handle_to_reader_key(handle));
703        if let Some(e) = entry {
704            e.value.cancel.notify_waiters();
705        }
706    }
707
708    // ── Session ──────────────────────────────────────────────────────────
709
710    /// Insert a `SessionEntry` and return a handle.
711    pub fn insert_session(&self, entry: SessionEntry) -> Result<u64, CoreError> {
712        Self::insert_checked(&self.sessions, Arc::new(entry), self.config.max_handles)
713    }
714
715    /// Look up a session by handle without consuming it.
716    pub fn lookup_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
717        self.sessions
718            .lock()
719            .unwrap_or_else(|e| e.into_inner())
720            .get(handle_to_session_key(handle))
721            .map(|e| e.value.clone())
722    }
723
724    /// Remove a session entry by handle and return it.
725    pub fn remove_session(&self, handle: u64) -> Option<Arc<SessionEntry>> {
726        self.sessions
727            .lock()
728            .unwrap_or_else(|e| e.into_inner())
729            .remove(handle_to_session_key(handle))
730            .map(|e| e.value)
731    }
732
733    // ── Request head (for server respond path) ───────────────────────────
734
735    /// Insert a response-head oneshot sender and return a handle.
736    pub fn allocate_req_handle(
737        &self,
738        sender: tokio::sync::oneshot::Sender<ResponseHeadEntry>,
739    ) -> Result<u64, CoreError> {
740        Self::insert_checked(&self.request_heads, sender, self.config.max_handles)
741    }
742
743    /// Remove and return the response-head sender for the given handle.
744    pub fn take_req_sender(
745        &self,
746        handle: u64,
747    ) -> Option<tokio::sync::oneshot::Sender<ResponseHeadEntry>> {
748        self.request_heads
749            .lock()
750            .unwrap_or_else(|e| e.into_inner())
751            .remove(handle_to_request_head_key(handle))
752            .map(|e| e.value)
753    }
754
755    // ── Fetch cancel ─────────────────────────────────────────────────────
756
757    /// Allocate a cancellation token for an upcoming `fetch` call.
758    pub fn alloc_fetch_token(&self) -> Result<u64, CoreError> {
759        let notify = Arc::new(tokio::sync::Notify::new());
760        Self::insert_checked(&self.fetch_cancels, notify, self.config.max_handles)
761    }
762
763    /// Signal an in-flight fetch to abort.
764    pub fn cancel_in_flight(&self, token: u64) {
765        if let Some(entry) = self
766            .fetch_cancels
767            .lock()
768            .unwrap_or_else(|e| e.into_inner())
769            .get(handle_to_fetch_cancel_key(token))
770        {
771            entry.value.notify_one();
772        }
773    }
774
775    /// Retrieve the `Notify` for a fetch token (clones the Arc for use in select!).
776    pub fn get_fetch_cancel_notify(&self, token: u64) -> Option<Arc<tokio::sync::Notify>> {
777        self.fetch_cancels
778            .lock()
779            .unwrap_or_else(|e| e.into_inner())
780            .get(handle_to_fetch_cancel_key(token))
781            .map(|e| e.value.clone())
782    }
783
784    /// Remove a fetch cancel token after the fetch completes.
785    pub fn remove_fetch_token(&self, token: u64) {
786        self.fetch_cancels
787            .lock()
788            .unwrap_or_else(|e| e.into_inner())
789            .remove(handle_to_fetch_cancel_key(token));
790    }
791
792    // ── TTL sweep ────────────────────────────────────────────────────────
793
794    /// Sweep all registries, removing entries older than `ttl`.
795    /// Also compacts any registry that is empty after sweeping to reclaim
796    /// the backing memory from traffic bursts.
797    pub fn sweep(&self, ttl: Duration) {
798        Self::sweep_readers(&self.readers, ttl);
799        Self::sweep_registry(&self.writers, ttl);
800        Self::sweep_registry(&self.request_heads, ttl);
801        Self::sweep_registry(&self.sessions, ttl);
802        Self::sweep_registry(&self.fetch_cancels, ttl);
803        self.sweep_pending_readers(ttl);
804    }
805
806    /// Sweep expired readers, firing the cancel signal so any in-flight
807    /// `next_chunk` awaits terminate promptly instead of hanging.
808    fn sweep_readers(registry: &Mutex<SlotMap<ReaderKey, Timed<BodyReader>>>, ttl: Duration) {
809        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
810        let expired: Vec<ReaderKey> = reg
811            .iter()
812            .filter(|(_, e)| e.is_expired(ttl))
813            .map(|(k, _)| k)
814            .collect();
815
816        if expired.is_empty() {
817            return;
818        }
819
820        for key in &expired {
821            if let Some(entry) = reg.remove(*key) {
822                entry.value.cancel.notify_waiters();
823            }
824        }
825        tracing::debug!(
826            "[iroh-http] swept {} expired reader entries (ttl={ttl:?})",
827            expired.len()
828        );
829        if reg.is_empty() && reg.capacity() > 128 {
830            *reg = SlotMap::with_key();
831        }
832    }
833
834    fn sweep_registry<K: slotmap::Key, T>(registry: &Mutex<SlotMap<K, Timed<T>>>, ttl: Duration) {
835        let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
836        let expired: Vec<K> = reg
837            .iter()
838            .filter(|(_, e)| e.is_expired(ttl))
839            .map(|(k, _)| k)
840            .collect();
841
842        if expired.is_empty() {
843            return;
844        }
845
846        for key in &expired {
847            reg.remove(*key);
848        }
849        tracing::debug!(
850            "[iroh-http] swept {} expired registry entries (ttl={ttl:?})",
851            expired.len()
852        );
853        // Compact when empty to reclaim backing memory after traffic bursts.
854        if reg.is_empty() && reg.capacity() > 128 {
855            *reg = SlotMap::with_key();
856        }
857    }
858
859    fn sweep_pending_readers(&self, ttl: Duration) {
860        let mut map = self
861            .pending_readers
862            .lock()
863            .unwrap_or_else(|e| e.into_inner());
864        let before = map.len();
865        map.retain(|_, e| e.created.elapsed() < ttl);
866        let removed = before.saturating_sub(map.len());
867        if removed > 0 {
868            tracing::debug!("[iroh-http] swept {removed} stale pending readers (ttl={ttl:?})");
869        }
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876
877    fn test_store() -> HandleStore {
878        HandleStore::new(StoreConfig::default())
879    }
880
881    // ── Body channel basics ─────────────────────────────────────────────
882
883    #[tokio::test]
884    async fn body_channel_send_recv() {
885        let (writer, reader) = make_body_channel();
886        writer.send_chunk(Bytes::from("hello")).await.unwrap();
887        drop(writer); // signal EOF
888        let chunk = reader.next_chunk().await;
889        assert_eq!(chunk, Some(Bytes::from("hello")));
890        let eof = reader.next_chunk().await;
891        assert!(eof.is_none());
892    }
893
894    #[tokio::test]
895    async fn body_channel_multiple_chunks() {
896        let (writer, reader) = make_body_channel();
897        writer.send_chunk(Bytes::from("a")).await.unwrap();
898        writer.send_chunk(Bytes::from("b")).await.unwrap();
899        writer.send_chunk(Bytes::from("c")).await.unwrap();
900        drop(writer);
901
902        let mut collected = Vec::new();
903        while let Some(chunk) = reader.next_chunk().await {
904            collected.push(chunk);
905        }
906        assert_eq!(
907            collected,
908            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c"),]
909        );
910    }
911
912    #[tokio::test]
913    async fn body_channel_reader_dropped_returns_error() {
914        let (writer, reader) = make_body_channel();
915        drop(reader);
916        let result = writer.send_chunk(Bytes::from("data")).await;
917        assert!(result.is_err());
918    }
919
920    // ── BodyWriter Sink<Bytes> impl (ADR-014 D4 / #174) ─────────────────
921
922    #[tokio::test]
923    async fn body_writer_sink_forward_from_stream() {
924        use futures::{stream, StreamExt};
925        let (writer, reader) = make_body_channel();
926
927        let chunks = vec![
928            Ok::<_, BoxError>(Bytes::from("one")),
929            Ok(Bytes::from("two")),
930            Ok(Bytes::from("three")),
931        ];
932        let producer = tokio::spawn(async move {
933            stream::iter(chunks).forward(writer).await.unwrap();
934            // Forward drops `writer` on completion → reader sees EOF.
935        });
936
937        let mut collected = Vec::new();
938        while let Some(chunk) = reader.next_chunk().await {
939            collected.push(chunk);
940        }
941        producer.await.unwrap();
942
943        assert_eq!(
944            collected,
945            vec![Bytes::from("one"), Bytes::from("two"), Bytes::from("three")]
946        );
947    }
948
949    #[tokio::test]
950    async fn body_writer_sink_send_via_sinkext() {
951        use futures::SinkExt;
952        let (mut writer, reader) = make_body_channel();
953        writer.send(Bytes::from("a")).await.unwrap();
954        writer.send(Bytes::from("b")).await.unwrap();
955        writer.close().await.unwrap();
956        drop(writer);
957
958        let mut collected = Vec::new();
959        while let Some(chunk) = reader.next_chunk().await {
960            collected.push(chunk);
961        }
962        assert_eq!(collected, vec![Bytes::from("a"), Bytes::from("b")]);
963    }
964
965    #[tokio::test]
966    async fn body_writer_sink_propagates_reader_dropped() {
967        use futures::SinkExt;
968        let (mut writer, reader) = make_body_channel();
969        drop(reader);
970        // Channel capacity is DEFAULT_CHANNEL_CAPACITY = 32, so the first
971        // few sends accept into the buffer; one of them eventually surfaces
972        // the reader-dropped error. Push past capacity to force it.
973        let mut err = None;
974        for _ in 0..(DEFAULT_CHANNEL_CAPACITY + 1) {
975            if let Err(e) = writer.send(Bytes::from("x")).await {
976                err = Some(e);
977                break;
978            }
979        }
980        assert!(err.is_some(), "expected reader-dropped error from Sink");
981    }
982
983    // ── HandleStore operations ──────────────────────────────────────────
984
985    #[tokio::test]
986    async fn insert_reader_and_next_chunk() {
987        let store = test_store();
988        let (writer, reader) = store.make_body_channel();
989        let handle = store.insert_reader(reader).unwrap();
990
991        writer.send_chunk(Bytes::from("slab-data")).await.unwrap();
992        drop(writer);
993
994        let chunk = store.next_chunk(handle).await.unwrap();
995        assert_eq!(chunk, Some(Bytes::from("slab-data")));
996
997        // EOF cleans up the registry entry
998        let eof = store.next_chunk(handle).await.unwrap();
999        assert!(eof.is_none());
1000    }
1001
1002    #[tokio::test]
1003    async fn next_chunk_invalid_handle() {
1004        let store = test_store();
1005        let result = store.next_chunk(999999).await;
1006        assert!(result.is_err());
1007        assert_eq!(result.unwrap_err().code, crate::ErrorCode::InvalidInput);
1008    }
1009
1010    #[tokio::test]
1011    async fn send_chunk_via_handle() {
1012        let store = test_store();
1013        let (writer, reader) = store.make_body_channel();
1014        let handle = store.insert_writer(writer).unwrap();
1015
1016        store
1017            .send_chunk(handle, Bytes::from("via-slab"))
1018            .await
1019            .unwrap();
1020        store.finish_body(handle).unwrap();
1021
1022        let chunk = reader.next_chunk().await;
1023        assert_eq!(chunk, Some(Bytes::from("via-slab")));
1024        let eof = reader.next_chunk().await;
1025        assert!(eof.is_none());
1026    }
1027
1028    #[tokio::test]
1029    async fn capacity_cap_rejects_overflow() {
1030        let store = HandleStore::new(StoreConfig {
1031            max_handles: 2,
1032            ..StoreConfig::default()
1033        });
1034        let (_, r1) = store.make_body_channel();
1035        let (_, r2) = store.make_body_channel();
1036        let (_, r3) = store.make_body_channel();
1037        store.insert_reader(r1).unwrap();
1038        store.insert_reader(r2).unwrap();
1039        let err = store.insert_reader(r3).unwrap_err();
1040        assert!(err.message.contains("capacity"));
1041    }
1042
1043    // ── #84 regression: recv_with_cancel cancellation ──────────────────
1044
1045    #[tokio::test]
1046    async fn recv_with_cancel_returns_none_on_cancel() {
1047        let (_tx, rx) = mpsc::channel::<Bytes>(4);
1048        let rx = Arc::new(tokio::sync::Mutex::new(rx));
1049        let cancel = Arc::new(tokio::sync::Notify::new());
1050        // Notify before polling — biased select must return None immediately.
1051        cancel.notify_one();
1052        let result = recv_with_cancel(rx, cancel).await;
1053        assert!(result.is_none());
1054    }
1055}