Skip to main content

iroh_http_core/ffi/
dispatcher.rs

1//! FFI shaped serve + respond — the JS-callback bridge over the
2//! pure-Rust [`crate::http::server::serve_with_events`] entry.
3//!
4//! Per epic #182 this module is the **only** place that allocates `u64`
5//! handles, fires the [`RequestPayload`] callback, and rendezvous on the
6//! response head from JS. Everything below is one specific implementation
7//! of the generic `tower::Service<Request<Body>>` accepted by the pure-Rust
8//! serve entry — handed in via [`ffi_serve_with_callback`].
9//!
10//! Architecture-test guarantee: this file may import from `crate::http`
11// Legitimate FFI wiring — uses the disallowed types intentionally.
12#![allow(clippy::disallowed_types)]
13//! (the `ffi → http` direction is allowed); `crate::http::*` must NOT
14//! import from here.
15
16use std::convert::Infallible;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use bytes::Bytes;
23use http::StatusCode;
24use tower::Service;
25
26use crate::ffi::handles::ResponseHeadEntry;
27use crate::ffi::pumps::pump_hyper_body_to_channel;
28use crate::http::server::{
29    serve_with_events, ConnectionEventFn, RemoteNodeId, ServeHandle, ServeOptions,
30};
31use crate::{Body, CoreError, IrohEndpoint, RequestPayload};
32
33// ── Inline error responses ─────────────────────────────────────────────────
34
35fn internal_error(detail: &'static [u8]) -> hyper::Response<Body> {
36    hyper::Response::builder()
37        .status(StatusCode::INTERNAL_SERVER_ERROR)
38        .body(Body::full(Bytes::from_static(detail)))
39        .expect("static error response args are valid")
40}
41
42fn service_unavailable(detail: &'static [u8]) -> hyper::Response<Body> {
43    hyper::Response::builder()
44        .status(StatusCode::SERVICE_UNAVAILABLE)
45        .body(Body::full(Bytes::from_static(detail)))
46        .expect("static error response args are valid")
47}
48
49// ── respond() ─────────────────────────────────────────────────────────────
50
51/// Send the response head for a request handle previously delivered to JS
52/// via the [`RequestPayload::req_handle`] callback.
53///
54/// Partner of the `head_rx` rendezvous in [`FfiDispatcher::dispatch`]. The
55/// JS handler calls this exactly once per request to provide the status +
56/// headers; subsequent body bytes flow through the response body channel
57/// referenced by [`RequestPayload::res_body_handle`].
58pub fn respond(
59    handles: &crate::ffi::handles::HandleStore,
60    req_handle: u64,
61    status: u16,
62    headers: Vec<(String, String)>,
63) -> Result<(), CoreError> {
64    StatusCode::from_u16(status)
65        .map_err(|_| CoreError::invalid_input(format!("invalid HTTP status code: {status}")))?;
66    for (name, value) in &headers {
67        http::HeaderName::from_bytes(name.as_bytes()).map_err(|_| {
68            CoreError::invalid_input(format!("invalid response header name {:?}", name))
69        })?;
70        http::HeaderValue::from_str(value).map_err(|_| {
71            CoreError::invalid_input(format!("invalid response header value for {:?}", name))
72        })?;
73    }
74
75    let sender = handles
76        .take_req_sender(req_handle)
77        .ok_or_else(|| CoreError::invalid_handle(req_handle))?;
78    sender
79        .send(ResponseHeadEntry { status, headers })
80        .map_err(|_| CoreError::internal("serve task dropped before respond"))
81}
82
83// ── ReqHeadGuard ──────────────────────────────────────────────────────────
84//
85// RAII guard that removes the per-request slab entry from the handle
86// store on every dispatch exit path (success, error, panic). Lifted to
87// module scope from inline-in-`dispatch` per Slice C.4 of #182 (#178).
88
89struct ReqHeadGuard {
90    endpoint: IrohEndpoint,
91    req_handle: u64,
92}
93
94impl Drop for ReqHeadGuard {
95    fn drop(&mut self) {
96        self.endpoint.handles().take_req_sender(self.req_handle);
97    }
98}
99
100// ── FfiDispatcher + IrohHttpService ───────────────────────────────────────
101
102/// FFI-shaped tower service: allocates request/response handles, fires the
103/// JS `on_request` callback, and rendezvous on the response head sent via
104/// [`respond`]. Shared across every accepted connection and request via
105/// `Arc`.
106struct FfiDispatcher {
107    on_request: Arc<dyn Fn(RequestPayload) + Send + Sync>,
108    endpoint: IrohEndpoint,
109    own_node_id: Arc<String>,
110    max_header_size: Option<usize>,
111}
112
113#[derive(Clone)]
114pub(crate) struct IrohHttpService {
115    dispatcher: Arc<FfiDispatcher>,
116}
117
118/// ADR-014 D2 / #175: service is concrete — not generic over `B`.
119/// Body normalisation happens upstream at the hyper → tower seam in
120/// `crate::http::server::pipeline::serve_bistream`.
121impl Service<hyper::Request<Body>> for IrohHttpService {
122    type Response = hyper::Response<Body>;
123    type Error = Infallible;
124    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
125
126    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127        Poll::Ready(Ok(()))
128    }
129
130    fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
131        let dispatcher = self.dispatcher.clone();
132        // #177: the authenticated peer id arrives as a request extension
133        // inserted by the per-connection AddExtensionLayer in
134        // serve_with_events. Missing extension is a server-side
135        // bug (the layer is unconditional) — fall back to empty string.
136        let remote_node_id = req
137            .extensions()
138            .get::<RemoteNodeId>()
139            .map(|r| r.0.clone())
140            .unwrap_or_else(|| Arc::new(String::new()));
141        Box::pin(async move { Ok(dispatcher.dispatch(req, remote_node_id).await) })
142    }
143}
144
145impl FfiDispatcher {
146    async fn dispatch(
147        self: Arc<Self>,
148        req: hyper::Request<Body>,
149        remote_node_id: Arc<String>,
150    ) -> hyper::Response<Body> {
151        let handles = self.endpoint.handles();
152        let own_node_id = &*self.own_node_id;
153        let max_header_size = self.max_header_size;
154
155        let method = req.method().to_string();
156        let path_and_query = req
157            .uri()
158            .path_and_query()
159            .map(|p| p.as_str())
160            .unwrap_or("/")
161            .to_string();
162
163        tracing::debug!(
164            method = %method,
165            path = %path_and_query,
166            peer = %remote_node_id,
167            "iroh-http: incoming request",
168        );
169
170        // Strip any client-supplied peer-id to prevent spoofing,
171        // then inject the authenticated identity from the QUIC connection.
172        //
173        // ISS-011: Use raw byte length for header-size accounting to prevent
174        // bypass via non-UTF8 values.  Reject non-UTF8 header values with 400
175        // instead of silently converting them to empty strings.
176
177        // First pass: measure header bytes using raw values (before lossy conversion).
178        if let Some(limit) = max_header_size {
179            let header_bytes: usize = req
180                .headers()
181                .iter()
182                .filter(|(k, _)| !k.as_str().eq_ignore_ascii_case("peer-id"))
183                .map(|(k, v)| {
184                    k.as_str()
185                        .len()
186                        .saturating_add(v.as_bytes().len())
187                        .saturating_add(4)
188                }) // ": " + "\r\n"
189                .fold(0usize, |acc, x| acc.saturating_add(x))
190                .saturating_add("peer-id".len())
191                .saturating_add(remote_node_id.len())
192                .saturating_add(4)
193                .saturating_add(req.uri().to_string().len())
194                .saturating_add(method.len())
195                .saturating_add(12); // "HTTP/1.1 \r\n\r\n" overhead
196            if header_bytes > limit {
197                let resp = hyper::Response::builder()
198                    .status(StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE)
199                    .body(Body::empty())
200                    .expect("static response args are valid");
201                return resp;
202            }
203        }
204
205        // Build header list — reject non-UTF8 values instead of silently dropping.
206        let mut req_headers: Vec<(String, String)> = Vec::new();
207        for (k, v) in req.headers().iter() {
208            if k.as_str().eq_ignore_ascii_case("peer-id") {
209                continue;
210            }
211            match v.to_str() {
212                Ok(s) => req_headers.push((k.as_str().to_string(), s.to_string())),
213                Err(_) => {
214                    let resp = hyper::Response::builder()
215                        .status(StatusCode::BAD_REQUEST)
216                        .body(Body::full(Bytes::from_static(b"non-UTF8 header value")))
217                        .expect("static response args are valid");
218                    return resp;
219                }
220            }
221        }
222        req_headers.push(("peer-id".to_string(), (*remote_node_id).clone()));
223
224        let url = format!("httpi://{own_node_id}{path_and_query}");
225
226        // ── Allocate channels ────────────────────────────────────────────────
227
228        let mut guard = handles.insert_guard();
229        let (req_body_writer, req_body_reader) = handles.make_body_channel();
230        let req_body_handle = match guard.insert_reader(req_body_reader) {
231            Ok(h) => h,
232            Err(_) => return service_unavailable(b"server handle table full"),
233        };
234
235        let (res_body_writer, res_body_reader) = handles.make_body_channel();
236        let res_body_handle = match guard.insert_writer(res_body_writer) {
237            Ok(h) => h,
238            Err(_) => return service_unavailable(b"server handle table full"),
239        };
240
241        let (head_tx, head_rx) = tokio::sync::oneshot::channel::<ResponseHeadEntry>();
242        let req_handle = match guard.allocate_req_handle(head_tx) {
243            Ok(h) => h,
244            Err(_) => return service_unavailable(b"server handle table full"),
245        };
246
247        guard.commit();
248
249        let _req_head_guard = ReqHeadGuard {
250            endpoint: self.endpoint.clone(),
251            req_handle,
252        };
253
254        // ── Pump request body ────────────────────────────────────────────────
255
256        let body = req.into_body();
257        tokio::spawn(pump_hyper_body_to_channel(body, req_body_writer));
258
259        // ── Fire on_request callback ─────────────────────────────────────────
260
261        (self.on_request)(RequestPayload {
262            req_handle,
263            req_body_handle,
264            res_body_handle,
265            method,
266            url,
267            headers: req_headers,
268            remote_node_id: Arc::unwrap_or_clone(remote_node_id),
269            is_bidi: false,
270        });
271
272        // ── Await response head from JS ──────────────────────────────────────
273        let response_head = match head_rx.await {
274            Ok(h) => h,
275            Err(_) => return internal_error(b"JS handler dropped without responding"),
276        };
277
278        // ── Regular HTTP response ─────────────────────────────────────────────
279
280        let mut resp_builder = hyper::Response::builder().status(response_head.status);
281        for (k, v) in &response_head.headers {
282            resp_builder = resp_builder.header(k.as_str(), v.as_str());
283        }
284
285        match resp_builder.body(Body::new(res_body_reader)) {
286            Ok(r) => r,
287            Err(_) => internal_error(b"failed to build response head from JS"),
288        }
289    }
290}
291
292// ── ffi_serve_with_callback ───────────────────────────────────────────────
293
294/// FFI-shaped serve entry. Constructs an [`IrohHttpService`] around the
295/// supplied callback and delegates to the pure-Rust
296/// [`crate::http::server::serve_with_events`].
297///
298/// `on_connection_event` is called on 0→1 (first connection from a peer)
299/// and 1→0 (last connection from a peer closed) count transitions.
300///
301/// # Security
302///
303/// Calling this opens a **public endpoint** on the Iroh overlay network.
304/// Any peer that knows or discovers your node's public key can connect
305/// and send requests. Iroh QUIC authenticates the peer's *identity*
306/// cryptographically, but does not enforce *authorization*. Always
307/// inspect [`RequestPayload::remote_node_id`] and reject untrusted peers.
308pub fn ffi_serve_with_callback<F>(
309    endpoint: IrohEndpoint,
310    options: ServeOptions,
311    on_request: F,
312    on_connection_event: Option<ConnectionEventFn>,
313) -> ServeHandle
314where
315    F: Fn(RequestPayload) + Send + Sync + 'static,
316{
317    let max_header_size = endpoint.max_header_size();
318    let own_node_id = Arc::new(endpoint.node_id().to_string());
319    let on_request = Arc::new(on_request) as Arc<dyn Fn(RequestPayload) + Send + Sync>;
320
321    let dispatcher = Arc::new(FfiDispatcher {
322        on_request,
323        endpoint: endpoint.clone(),
324        own_node_id,
325        max_header_size: if max_header_size == 0 {
326            None
327        } else {
328            Some(max_header_size)
329        },
330    });
331    let svc = IrohHttpService { dispatcher };
332
333    serve_with_events(endpoint, options, svc, on_connection_event)
334}
335
336/// Back-compat 3-arg FFI serve entry: equivalent to
337/// [`ffi_serve_with_callback`] with `on_connection_event = None`. The Node /
338/// Deno / Tauri adapters call this directly.
339pub fn ffi_serve<F>(endpoint: IrohEndpoint, options: ServeOptions, on_request: F) -> ServeHandle
340where
341    F: Fn(RequestPayload) + Send + Sync + 'static,
342{
343    ffi_serve_with_callback(endpoint, options, on_request, None)
344}