#![allow(clippy::disallowed_types)] mod common;
use bytes::Bytes;
use iroh_http_core::respond;
use iroh_http_core::{
fetch, ffi_serve, IrohEndpoint, NetworkingOptions, NodeOptions, RequestPayload, ServeOptions,
};
#[tokio::test]
async fn header_bomb_rejected() {
let (server_ep, client_ep) = common::make_pair_custom_server(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
max_header_size: Some(256), ..Default::default()
})
.await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
respond(
server_ep.handles(),
payload.req_handle,
200,
vec![("content-length".into(), "0".into())],
)
.unwrap();
server_ep
.handles()
.finish_body(payload.res_body_handle)
.unwrap();
},
);
let big_value = "X".repeat(300);
let headers = vec![("x-big".to_string(), big_value)];
let result = fetch(
&client_ep,
&server_id,
"/bomb",
"GET",
&headers,
None,
None,
Some(&addrs),
None,
true,
None, )
.await;
let resp = result.expect("expected a 431 response, not a transport error");
assert_eq!(
resp.status, 431,
"expected 431 Request Header Fields Too Large, got: {}",
resp.status
);
}
#[tokio::test]
async fn response_header_bomb_rejected() {
let server_ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let client_ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
max_header_size: Some(128),
..Default::default()
})
.await
.unwrap();
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
let big_value = "Y".repeat(200);
respond(
server_ep.handles(),
payload.req_handle,
200,
vec![("x-huge".into(), big_value)],
)
.unwrap();
server_ep
.handles()
.finish_body(payload.res_body_handle)
.unwrap();
},
);
let result = fetch(
&client_ep,
&server_id,
"/big-response",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, )
.await;
assert!(
result.is_err(),
"expected error for oversized response header, got: {:?}",
result
);
let err = result.unwrap_err();
assert_eq!(
err.code,
iroh_http_core::ErrorCode::HeaderTooLarge,
"expected HeaderTooLarge, got: {:?}",
err,
);
}
#[tokio::test]
async fn default_limits_allow_normal_traffic() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
respond(
server_ep.handles(),
payload.req_handle,
200,
vec![("content-length".into(), "5".into())],
)
.unwrap();
let handle = payload.res_body_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
server_ep
.handles()
.send_chunk(handle, Bytes::from_static(b"hello"))
.await
.unwrap();
server_ep.handles().finish_body(handle).unwrap();
});
},
);
let res = fetch(
&client_ep,
&server_id,
"/normal",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, )
.await
.unwrap();
assert_eq!(res.status, 200);
let chunk = client_ep
.handles()
.next_chunk(res.body_handle)
.await
.unwrap();
assert_eq!(chunk.unwrap().as_ref(), b"hello");
let eof = client_ep
.handles()
.next_chunk(res.body_handle)
.await
.unwrap();
assert!(eof.is_none());
}
#[tokio::test]
async fn body_limit_exceeded_resets_stream() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(64), ..Default::default()
},
move |payload: RequestPayload| {
let body_h = payload.req_body_handle;
let res_h = payload.res_body_handle;
let req_h = payload.req_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
let mut total = 0usize;
while let Ok(Some(chunk)) = server_ep.handles().next_chunk(body_h).await {
total += chunk.len();
}
let body = format!("{total}");
respond(
server_ep.handles(),
req_h,
200,
vec![("content-type".into(), "text/plain".into())],
)
.unwrap();
server_ep
.handles()
.send_chunk(res_h, Bytes::from(body))
.await
.unwrap();
server_ep.handles().finish_body(res_h).unwrap();
});
},
);
let (writer, reader) = iroh_http_core::make_body_channel();
let send_task = tokio::spawn(async move {
let chunk = Bytes::from(vec![0x41u8; 256]);
let _ = writer.send_chunk(chunk).await;
drop(writer);
});
let result = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[],
Some(reader),
None,
Some(&addrs),
None,
true,
None, )
.await;
send_task.await.unwrap();
if let Ok(res) = result {
if let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
let received: usize = std::str::from_utf8(&chunk)
.unwrap_or("0")
.parse()
.unwrap_or(0);
assert!(
received <= 64,
"server received {received} bytes, should be <= 64"
);
}
}
}
#[tokio::test]
async fn per_peer_connection_limit_config() {
let opts = ServeOptions {
max_connections_per_peer: Some(2),
request_timeout_ms: Some(30_000),
max_request_body_wire_bytes: Some(1024 * 1024),
..Default::default()
};
assert_eq!(opts.max_connections_per_peer, Some(2));
assert_eq!(opts.request_timeout_ms, Some(30_000));
assert_eq!(opts.max_request_body_wire_bytes, Some(1024 * 1024));
}
#[tokio::test]
async fn max_header_size_default_is_64kb() {
let ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
..Default::default()
})
.await
.unwrap();
assert_eq!(ep.max_header_size(), 64 * 1024);
}
#[tokio::test]
async fn max_header_size_custom() {
let ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
max_header_size: Some(1024),
..Default::default()
})
.await
.unwrap();
assert_eq!(ep.max_header_size(), 1024);
}
#[tokio::test]
async fn max_header_size_zero_is_rejected() {
let result = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
max_header_size: Some(0),
..Default::default()
})
.await;
let err = match result {
Err(e) => e,
Ok(_) => panic!("bind should reject max_header_size: Some(0)"),
};
assert!(
err.message.contains("max_header_size"),
"error should mention max_header_size, got: {err}"
);
}
#[tokio::test]
async fn serve_concurrency_limit() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let gate = std::sync::Arc::new(tokio::sync::Barrier::new(1));
ffi_serve(
server_ep.clone(),
ServeOptions {
max_concurrency: Some(2),
..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
server_ep.handles().finish_body(res_body).unwrap();
},
);
let (r1, r2, r3) = tokio::join!(
fetch(
&client_ep,
&server_id,
"/r1",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, ),
fetch(
&client_ep,
&server_id,
"/r2",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, ),
fetch(
&client_ep,
&server_id,
"/r3",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, ),
);
assert_eq!(r1.unwrap().status, 200);
assert_eq!(r2.unwrap().status, 200);
assert_eq!(r3.unwrap().status, 200);
drop(gate);
}
#[tokio::test]
async fn body_exceeds_limit_resets_stream() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(100),
..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
let req_body = payload.req_body_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
while let Ok(Some(_)) = server_ep.handles().next_chunk(req_body).await {}
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
server_ep.handles().finish_body(res_body).unwrap();
});
},
);
let big_body = vec![b'x'; 10_000];
let (writer_handle, body_reader) = client_ep.handles().alloc_body_writer().unwrap();
let client_ep_send = client_ep.clone();
tokio::spawn(async move {
client_ep_send
.handles()
.send_chunk(writer_handle, Bytes::from(big_body))
.await
.unwrap();
client_ep_send.handles().finish_body(writer_handle).unwrap();
});
let result = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await;
let _ = result;
}
#[tokio::test]
async fn concurrent_requests_under_tight_concurrency() {
let server_opts = NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
};
let client_opts = NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
};
let server_ep = IrohEndpoint::bind(server_opts).await.unwrap();
let client_ep = IrohEndpoint::bind(client_opts).await.unwrap();
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_concurrency: Some(2),
load_shed: Some(false),
..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
server_ep.handles().finish_body(res_body).unwrap();
});
},
);
let mut handles = Vec::new();
for i in 0..20 {
let client = client_ep.clone();
let id = server_id.clone();
let a = addrs.clone();
handles.push(tokio::spawn(async move {
let path = format!("/stress/{i}");
fetch(
&client,
&id,
&path,
"GET",
&[],
None,
None,
Some(&a),
None,
true,
None, )
.await
}));
}
let mut ok_count = 0;
for h in handles {
match h.await.unwrap() {
Ok(res) => {
assert_eq!(res.status, 200);
ok_count += 1;
}
Err(_) => {
}
}
}
assert!(
ok_count >= 10,
"expected ≥10 successes under concurrency=2, got {ok_count}"
);
}
#[tokio::test]
async fn body_overflow_drains_quic_stream() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(100),
..Default::default()
},
move |_payload: RequestPayload| {
},
);
let big_body = Bytes::from(vec![b'z'; 50_000]);
let (writer_handle, body_reader) = client_ep.handles().alloc_body_writer().unwrap();
let client_ep_write = client_ep.clone();
let big_body_clone = big_body.clone();
let write_task = tokio::spawn(async move {
let _ = client_ep_write
.handles()
.send_chunk(writer_handle, big_body_clone)
.await;
let _ = client_ep_write.handles().finish_body(writer_handle);
});
let result = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await;
let _ = result;
let deadline = tokio::time::timeout(std::time::Duration::from_millis(500), write_task).await;
assert!(
deadline.is_ok(),
"client write task stalled after body overflow — QUIC stream was not drained"
);
}
#[tokio::test]
async fn zstd_bomb_rejected_by_decoded_body_limit() {
const DECODED_LIMIT: usize = 8 * 1024; const PLAINTEXT_SIZE: usize = 100 * 1024;
let plaintext = vec![0u8; PLAINTEXT_SIZE];
let compressed =
zstd::stream::encode_all(plaintext.as_slice(), 3).expect("zstd encode succeeds");
assert!(
compressed.len() < DECODED_LIMIT,
"expected compressed payload < {DECODED_LIMIT} B, got {} B",
compressed.len()
);
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(64 * 1024),
max_request_body_decoded_bytes: Some(DECODED_LIMIT),
..Default::default()
},
move |payload: RequestPayload| {
let body_h = payload.req_body_handle;
let res_h = payload.res_body_handle;
let req_h = payload.req_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
let mut total = 0usize;
while let Ok(Some(chunk)) = server_ep.handles().next_chunk(body_h).await {
total += chunk.len();
}
let count_str = format!("{total}");
respond(
server_ep.handles(),
req_h,
200,
vec![("content-type".into(), "text/plain".into())],
)
.unwrap();
let _ = server_ep
.handles()
.send_chunk(res_h, Bytes::from(count_str))
.await;
let _ = server_ep.handles().finish_body(res_h);
});
},
);
let (writer, body_reader) = iroh_http_core::make_body_channel();
tokio::spawn(async move {
let _ = writer.send_chunk(Bytes::from(compressed)).await;
drop(writer);
});
let result = fetch(
&client_ep,
&server_id,
"/bomb",
"POST",
&[
(
"content-type".to_string(),
"application/octet-stream".to_string(),
),
("content-encoding".to_string(), "zstd".to_string()),
],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await;
if let Ok(res) = result {
if let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
let received: usize = std::str::from_utf8(&chunk)
.unwrap_or("0")
.trim()
.parse()
.unwrap_or(0);
assert!(
received <= DECODED_LIMIT,
"decoded-body limit = {DECODED_LIMIT} B but handler saw {received} B; \
decoded limit is not being enforced inside decompression (regression #190)"
);
}
}
}
#[tokio::test]
async fn wire_limit_rejects_large_uncompressed_body() {
const WIRE_LIMIT: usize = 1024; const BODY_SIZE: usize = 2048;
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(WIRE_LIMIT),
max_request_body_decoded_bytes: None, ..Default::default()
},
move |payload: RequestPayload| {
let body_h = payload.req_body_handle;
let res_h = payload.res_body_handle;
let req_h = payload.req_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
let mut total = 0usize;
while let Ok(Some(chunk)) = server_ep.handles().next_chunk(body_h).await {
total += chunk.len();
}
let count_str = format!("{total}");
respond(
server_ep.handles(),
req_h,
200,
vec![("content-type".into(), "text/plain".into())],
)
.unwrap();
let _ = server_ep
.handles()
.send_chunk(res_h, Bytes::from(count_str))
.await;
let _ = server_ep.handles().finish_body(res_h);
});
},
);
let (writer, body_reader) = iroh_http_core::make_body_channel();
tokio::spawn(async move {
let _ = writer
.send_chunk(Bytes::from(vec![0x41u8; BODY_SIZE]))
.await;
drop(writer);
});
let result = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await;
match result {
Ok(res) => {
if let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
let received: usize = std::str::from_utf8(&chunk)
.unwrap_or("0")
.trim()
.parse()
.unwrap_or(0);
assert!(
received <= WIRE_LIMIT,
"wire limit = {WIRE_LIMIT} B but handler saw {received} B; \
wire limit is not being enforced (regression #190)"
);
}
}
Err(_) => { }
}
}
#[tokio::test]
async fn request_within_both_limits_succeeds() {
const BOTH_LIMITS: usize = 64 * 1024; const PLAINTEXT_SIZE: usize = 512;
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let plaintext = vec![0x42u8; PLAINTEXT_SIZE];
let compressed =
zstd::stream::encode_all(plaintext.as_slice(), 3).expect("zstd encode succeeds");
ffi_serve(
server_ep.clone(),
ServeOptions {
max_request_body_wire_bytes: Some(BOTH_LIMITS),
max_request_body_decoded_bytes: Some(BOTH_LIMITS),
..Default::default()
},
move |payload: RequestPayload| {
let body_h = payload.req_body_handle;
let res_h = payload.res_body_handle;
let req_h = payload.req_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
let mut total = 0usize;
while let Ok(Some(chunk)) = server_ep.handles().next_chunk(body_h).await {
total += chunk.len();
}
let count_str = format!("{total}");
respond(
server_ep.handles(),
req_h,
200,
vec![("content-type".into(), "text/plain".into())],
)
.unwrap();
let _ = server_ep
.handles()
.send_chunk(res_h, Bytes::from(count_str))
.await;
let _ = server_ep.handles().finish_body(res_h);
});
},
);
let (writer, body_reader) = iroh_http_core::make_body_channel();
tokio::spawn(async move {
let _ = writer.send_chunk(Bytes::from(compressed)).await;
drop(writer);
});
let res = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[
("content-type".into(), "application/octet-stream".into()),
("content-encoding".into(), "zstd".into()),
],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await
.expect("fetch must succeed for a body within both limits");
assert_eq!(res.status, 200, "expected 200, got {}", res.status);
if let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
let received: usize = std::str::from_utf8(&chunk)
.unwrap_or("0")
.trim()
.parse()
.unwrap_or(0);
assert_eq!(
received, PLAINTEXT_SIZE,
"expected handler to receive {PLAINTEXT_SIZE} decoded bytes, got {received}"
);
}
}