Skip to main content

iroh_http_core/http/server/
mod.rs

1//! Incoming HTTP request — pure-Rust `serve()` implementation.
2//!
3//! Each accepted QUIC bidirectional stream is driven by hyper's HTTP/1.1
4//! server connection. The user supplies a `tower::Service<Request<Body>,
5//! Response = Response<Body>, Error = Infallible>`; the per-connection
6//! `AddExtensionLayer` makes the authenticated peer id available as a
7//! [`RemoteNodeId`] request extension (closes #177).
8//!
9//! The accept loop body lives in [`accept`]; this file is the public
10//! surface and option-resolution glue, kept close to the axum reference
11//! shape. Sub-modules: [`options`] (`ServeOptions` + defaults), [`handle`]
12//! (`ServeHandle`), [`error_layer`] (tower → HTTP error converter),
13//! [`pipeline`] / [`stack`] / [`lifecycle`] (per-bistream chain and RAII
14//! guards).
15//!
16//! The FFI-shaped callback API ([`crate::ffi::dispatcher::ffi_serve_with_callback`])
17//! is one specific consumer of this entry — it constructs an
18//! `IrohHttpService` around the JS callback and hands it in like any
19//! other service.
20
21pub(crate) mod accept;
22pub(crate) mod error_layer;
23pub(crate) mod handle;
24pub(crate) mod lifecycle;
25pub(crate) mod options;
26pub(crate) mod pipeline;
27pub(crate) mod stack;
28
29use std::{sync::Arc, time::Duration};
30
31use tower::Service;
32
33use crate::{Body, ConnectionEvent, IrohEndpoint};
34
35use self::accept::{accept_loop, AcceptConfig};
36use self::options::{
37    DEFAULT_CONCURRENCY, DEFAULT_DRAIN_TIMEOUT_MS, DEFAULT_MAX_CONNECTIONS_PER_PEER,
38    DEFAULT_MAX_REQUEST_BODY_BYTES, DEFAULT_REQUEST_TIMEOUT_MS,
39};
40
41// Re-exported from sub-modules so external paths
42// (`crate::http::server::ServeOptions`, `…::ServeHandle`,
43// `…::HandleLayerErrorLayer`, `…::DEFAULT_MAX_RESPONSE_BODY_BYTES`) stay
44// unchanged after Slice C.7 split.
45pub(crate) use self::error_layer::HandleLayerErrorLayer;
46pub use self::handle::ServeHandle;
47pub use self::options::ServeOptions;
48pub(crate) use self::options::DEFAULT_MAX_RESPONSE_BODY_BYTES;
49
50// ── Connection-event callback type ───────────────────────────────────────────
51
52pub(crate) type ConnectionEventFn = Arc<dyn Fn(ConnectionEvent) + Send + Sync>;
53
54/// Authenticated peer node id of the QUIC connection a request arrived
55/// on. Inserted as a request extension by the per-connection
56/// [`tower_http::add_extension::AddExtensionLayer`] in
57/// [`serve_with_events`].
58///
59/// User-facing pure-Rust services consume it with
60/// `req.extensions().get::<RemoteNodeId>()`. Closes #177.
61#[derive(Clone, Debug)]
62pub struct RemoteNodeId(pub Arc<String>);
63
64/// Pure-Rust serve entry — convenience 3-arg wrapper that omits the
65/// connection-event callback. Equivalent to `serve_with_events(ep,
66/// opts, svc, None)`.
67pub fn serve<S>(endpoint: IrohEndpoint, options: ServeOptions, svc: S) -> ServeHandle
68where
69    S: Service<
70            hyper::Request<Body>,
71            Response = hyper::Response<Body>,
72            Error = std::convert::Infallible,
73        > + Clone
74        + Send
75        + Sync
76        + 'static,
77    S::Future: Send + 'static,
78{
79    serve_with_events(endpoint, options, svc, None)
80}
81
82/// Pure-Rust serve entry — the canonical inbound API.
83///
84/// Accepts any `tower::Service<Request<Body>, Response = Response<Body>,
85/// Error = Infallible>` (`Clone + Send + Sync + 'static`, with `Send`
86/// futures). Each accepted QUIC bidirectional stream is driven by
87/// hyper's HTTP/1.1 server connection through the per-connection tower
88/// stack composed in [`stack::build_stack`]; the user service sees
89/// requests with the authenticated peer id available as a typed
90/// [`RemoteNodeId`] request extension.
91///
92/// `on_connection_event` is called on 0→1 (first connection from a peer)
93/// and 1→0 (last connection from a peer closed) count transitions.
94///
95/// # Security
96///
97/// Calling this opens a **public endpoint** on the Iroh overlay network.
98/// Any peer that knows or discovers your node's public key can connect
99/// and send requests. Iroh QUIC authenticates the peer's *identity*
100/// cryptographically, but does not enforce *authorization*. Inspect
101/// [`RemoteNodeId`] in your service and reject untrusted peers.
102pub fn serve_with_events<S>(
103    endpoint: IrohEndpoint,
104    options: ServeOptions,
105    svc: S,
106    on_connection_event: Option<ConnectionEventFn>,
107) -> ServeHandle
108where
109    S: Service<
110            hyper::Request<Body>,
111            Response = hyper::Response<Body>,
112            Error = std::convert::Infallible,
113        > + Clone
114        + Send
115        + Sync
116        + 'static,
117    S::Future: Send + 'static,
118{
119    let cfg = AcceptConfig {
120        max: options.max_concurrency.unwrap_or(DEFAULT_CONCURRENCY),
121        request_timeout: options
122            .request_timeout_ms
123            .map(Duration::from_millis)
124            .unwrap_or(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MS)),
125        max_conns_per_peer: options
126            .max_connections_per_peer
127            .unwrap_or(DEFAULT_MAX_CONNECTIONS_PER_PEER),
128        max_request_body_wire_bytes: options
129            .max_request_body_wire_bytes
130            .or(Some(DEFAULT_MAX_REQUEST_BODY_BYTES)),
131        max_request_body_decoded_bytes: options
132            .max_request_body_decoded_bytes
133            .or(Some(DEFAULT_MAX_REQUEST_BODY_BYTES)),
134        max_total_connections: options.max_total_connections,
135        drain_timeout: Duration::from_millis(
136            options.drain_timeout_ms.unwrap_or(DEFAULT_DRAIN_TIMEOUT_MS),
137        ),
138        // Load-shed is opt-out — default `true`.
139        load_shed_enabled: options.load_shed.unwrap_or(true),
140        max_header_size: endpoint.max_header_size(),
141        stack_compression: endpoint.compression().cloned(),
142        // Decompression is opt-out — default `true`.
143        decompression: options.decompression.unwrap_or(true),
144    };
145
146    let shutdown_notify = Arc::new(tokio::sync::Notify::new());
147    let drain_dur = cfg.drain_timeout;
148    let (done_tx, done_rx) = tokio::sync::watch::channel(false);
149
150    let join = tokio::spawn(accept_loop(
151        endpoint,
152        cfg,
153        svc,
154        on_connection_event,
155        shutdown_notify.clone(),
156        done_tx,
157    ));
158
159    ServeHandle {
160        join,
161        shutdown_notify,
162        drain_timeout: drain_dur,
163        done_rx,
164    }
165}