1#![allow(clippy::disallowed_types)]
13use std::convert::Infallible;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use bytes::Bytes;
23use http::StatusCode;
24use tower::Service;
25
26use crate::ffi::handles::ResponseHeadEntry;
27use crate::ffi::pumps::pump_hyper_body_to_channel;
28use crate::http::server::{
29 serve_with_events, ConnectionEventFn, RemoteNodeId, ServeHandle, ServeOptions,
30};
31use crate::{Body, CoreError, IrohEndpoint, RequestPayload};
32
33fn internal_error(detail: &'static [u8]) -> hyper::Response<Body> {
36 hyper::Response::builder()
37 .status(StatusCode::INTERNAL_SERVER_ERROR)
38 .body(Body::full(Bytes::from_static(detail)))
39 .expect("static error response args are valid")
40}
41
42fn service_unavailable(detail: &'static [u8]) -> hyper::Response<Body> {
43 hyper::Response::builder()
44 .status(StatusCode::SERVICE_UNAVAILABLE)
45 .body(Body::full(Bytes::from_static(detail)))
46 .expect("static error response args are valid")
47}
48
49pub fn respond(
59 handles: &crate::ffi::handles::HandleStore,
60 req_handle: u64,
61 status: u16,
62 headers: Vec<(String, String)>,
63) -> Result<(), CoreError> {
64 StatusCode::from_u16(status)
65 .map_err(|_| CoreError::invalid_input(format!("invalid HTTP status code: {status}")))?;
66 for (name, value) in &headers {
67 http::HeaderName::from_bytes(name.as_bytes()).map_err(|_| {
68 CoreError::invalid_input(format!("invalid response header name {:?}", name))
69 })?;
70 http::HeaderValue::from_str(value).map_err(|_| {
71 CoreError::invalid_input(format!("invalid response header value for {:?}", name))
72 })?;
73 }
74
75 let sender = handles
76 .take_req_sender(req_handle)
77 .ok_or_else(|| CoreError::invalid_handle(req_handle))?;
78 sender
79 .send(ResponseHeadEntry { status, headers })
80 .map_err(|_| CoreError::internal("serve task dropped before respond"))
81}
82
83struct ReqHeadGuard {
90 endpoint: IrohEndpoint,
91 req_handle: u64,
92}
93
94impl Drop for ReqHeadGuard {
95 fn drop(&mut self) {
96 self.endpoint.handles().take_req_sender(self.req_handle);
97 }
98}
99
100struct FfiDispatcher {
107 on_request: Arc<dyn Fn(RequestPayload) + Send + Sync>,
108 endpoint: IrohEndpoint,
109 own_node_id: Arc<String>,
110 max_header_size: Option<usize>,
111}
112
113#[derive(Clone)]
114pub(crate) struct IrohHttpService {
115 dispatcher: Arc<FfiDispatcher>,
116}
117
118impl Service<hyper::Request<Body>> for IrohHttpService {
122 type Response = hyper::Response<Body>;
123 type Error = Infallible;
124 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
125
126 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127 Poll::Ready(Ok(()))
128 }
129
130 fn call(&mut self, req: hyper::Request<Body>) -> Self::Future {
131 let dispatcher = self.dispatcher.clone();
132 let remote_node_id = req
137 .extensions()
138 .get::<RemoteNodeId>()
139 .map(|r| r.0.clone())
140 .unwrap_or_else(|| Arc::new(String::new()));
141 Box::pin(async move { Ok(dispatcher.dispatch(req, remote_node_id).await) })
142 }
143}
144
145impl FfiDispatcher {
146 async fn dispatch(
147 self: Arc<Self>,
148 req: hyper::Request<Body>,
149 remote_node_id: Arc<String>,
150 ) -> hyper::Response<Body> {
151 let handles = self.endpoint.handles();
152 let own_node_id = &*self.own_node_id;
153 let max_header_size = self.max_header_size;
154
155 let method = req.method().to_string();
156 let path_and_query = req
157 .uri()
158 .path_and_query()
159 .map(|p| p.as_str())
160 .unwrap_or("/")
161 .to_string();
162
163 tracing::debug!(
164 method = %method,
165 path = %path_and_query,
166 peer = %remote_node_id,
167 "iroh-http: incoming request",
168 );
169
170 if let Some(limit) = max_header_size {
179 let header_bytes: usize = req
180 .headers()
181 .iter()
182 .filter(|(k, _)| !k.as_str().eq_ignore_ascii_case("peer-id"))
183 .map(|(k, v)| {
184 k.as_str()
185 .len()
186 .saturating_add(v.as_bytes().len())
187 .saturating_add(4)
188 }) .fold(0usize, |acc, x| acc.saturating_add(x))
190 .saturating_add("peer-id".len())
191 .saturating_add(remote_node_id.len())
192 .saturating_add(4)
193 .saturating_add(req.uri().to_string().len())
194 .saturating_add(method.len())
195 .saturating_add(12); if header_bytes > limit {
197 let resp = hyper::Response::builder()
198 .status(StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE)
199 .body(Body::empty())
200 .expect("static response args are valid");
201 return resp;
202 }
203 }
204
205 let mut req_headers: Vec<(String, String)> = Vec::new();
207 for (k, v) in req.headers().iter() {
208 if k.as_str().eq_ignore_ascii_case("peer-id") {
209 continue;
210 }
211 match v.to_str() {
212 Ok(s) => req_headers.push((k.as_str().to_string(), s.to_string())),
213 Err(_) => {
214 let resp = hyper::Response::builder()
215 .status(StatusCode::BAD_REQUEST)
216 .body(Body::full(Bytes::from_static(b"non-UTF8 header value")))
217 .expect("static response args are valid");
218 return resp;
219 }
220 }
221 }
222 req_headers.push(("peer-id".to_string(), (*remote_node_id).clone()));
223
224 let url = format!("httpi://{own_node_id}{path_and_query}");
225
226 let mut guard = handles.insert_guard();
229 let (req_body_writer, req_body_reader) = handles.make_body_channel();
230 let req_body_handle = match guard.insert_reader(req_body_reader) {
231 Ok(h) => h,
232 Err(_) => return service_unavailable(b"server handle table full"),
233 };
234
235 let (res_body_writer, res_body_reader) = handles.make_body_channel();
236 let res_body_handle = match guard.insert_writer(res_body_writer) {
237 Ok(h) => h,
238 Err(_) => return service_unavailable(b"server handle table full"),
239 };
240
241 let (head_tx, head_rx) = tokio::sync::oneshot::channel::<ResponseHeadEntry>();
242 let req_handle = match guard.allocate_req_handle(head_tx) {
243 Ok(h) => h,
244 Err(_) => return service_unavailable(b"server handle table full"),
245 };
246
247 guard.commit();
248
249 let _req_head_guard = ReqHeadGuard {
250 endpoint: self.endpoint.clone(),
251 req_handle,
252 };
253
254 let body = req.into_body();
257 tokio::spawn(pump_hyper_body_to_channel(body, req_body_writer));
258
259 (self.on_request)(RequestPayload {
262 req_handle,
263 req_body_handle,
264 res_body_handle,
265 method,
266 url,
267 headers: req_headers,
268 remote_node_id: Arc::unwrap_or_clone(remote_node_id),
269 is_bidi: false,
270 });
271
272 let response_head = match head_rx.await {
274 Ok(h) => h,
275 Err(_) => return internal_error(b"JS handler dropped without responding"),
276 };
277
278 let mut resp_builder = hyper::Response::builder().status(response_head.status);
281 for (k, v) in &response_head.headers {
282 resp_builder = resp_builder.header(k.as_str(), v.as_str());
283 }
284
285 match resp_builder.body(Body::new(res_body_reader)) {
286 Ok(r) => r,
287 Err(_) => internal_error(b"failed to build response head from JS"),
288 }
289 }
290}
291
292pub fn ffi_serve_with_callback<F>(
309 endpoint: IrohEndpoint,
310 options: ServeOptions,
311 on_request: F,
312 on_connection_event: Option<ConnectionEventFn>,
313) -> ServeHandle
314where
315 F: Fn(RequestPayload) + Send + Sync + 'static,
316{
317 let max_header_size = endpoint.max_header_size();
318 let own_node_id = Arc::new(endpoint.node_id().to_string());
319 let on_request = Arc::new(on_request) as Arc<dyn Fn(RequestPayload) + Send + Sync>;
320
321 let dispatcher = Arc::new(FfiDispatcher {
322 on_request,
323 endpoint: endpoint.clone(),
324 own_node_id,
325 max_header_size: if max_header_size == 0 {
326 None
327 } else {
328 Some(max_header_size)
329 },
330 });
331 let svc = IrohHttpService { dispatcher };
332
333 serve_with_events(endpoint, options, svc, on_connection_event)
334}
335
336pub fn ffi_serve<F>(endpoint: IrohEndpoint, options: ServeOptions, on_request: F) -> ServeHandle
340where
341 F: Fn(RequestPayload) + Send + Sync + 'static,
342{
343 ffi_serve_with_callback(endpoint, options, on_request, None)
344}