#![allow(clippy::disallowed_types)]
mod common;
use std::time::Duration;
use bytes::Bytes;
use iroh_http_core::respond;
use iroh_http_core::{fetch, ffi_serve, RequestPayload, ServeOptions};
#[tokio::test]
async fn fetch_timeout_fires_against_hung_server() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
tokio::spawn(async move {
let _keep = (payload.req_handle, payload.res_body_handle);
std::future::pending::<()>().await;
});
},
);
let start = std::time::Instant::now();
let result = fetch(
&client_ep,
&server_id,
"/hang",
"GET",
&[],
None,
None,
Some(&addrs),
Some(Duration::from_millis(200)), true,
None, )
.await;
let elapsed = start.elapsed();
assert!(result.is_err(), "expected timeout error, got: {:?}", result);
assert!(
elapsed < Duration::from_millis(500),
"timeout should fire within 500ms, took {elapsed:?}"
);
}
#[tokio::test]
async fn fetch_decompress_false_returns_raw_zstd_bytes() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let plaintext = b"hello, raw compression world! ".repeat(64);
let compressed =
zstd::stream::encode_all(plaintext.as_slice(), 0).expect("zstd encode succeeds");
let compressed_len = compressed.len();
let plaintext_len = plaintext.len();
assert!(
compressed_len < plaintext_len,
"compressed body should be smaller than plaintext"
);
let compressed_for_server = compressed.clone();
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body_handle = payload.res_body_handle;
let server_ep = server_ep.clone();
let body = Bytes::from(compressed_for_server.clone());
tokio::spawn(async move {
respond(
server_ep.handles(),
req_handle,
200,
vec![
("content-encoding".to_string(), "zstd".to_string()),
("content-length".to_string(), body.len().to_string()),
],
)
.unwrap();
server_ep
.handles()
.send_chunk(res_body_handle, body)
.await
.unwrap();
server_ep.handles().finish_body(res_body_handle).unwrap();
});
},
);
let res = fetch(
&client_ep,
&server_id,
"/raw",
"GET",
&[("accept-encoding".to_string(), "identity".to_string())],
None,
None,
Some(&addrs),
None,
false, None, )
.await
.expect("fetch should succeed");
assert_eq!(res.status, 200);
let mut received = Vec::new();
while let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
received.extend_from_slice(&chunk);
}
assert_eq!(
received,
compressed,
"with decompress=false the client should receive the raw zstd bytes, \
got {} bytes (expected {compressed_len}, plaintext was {plaintext_len})",
received.len(),
);
}
#[tokio::test]
async fn serve_decompression_false_passes_raw_compressed_body_to_handler() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let plaintext = b"serve-side raw compression test! ".repeat(64);
let plaintext_len = plaintext.len();
let compressed =
zstd::stream::encode_all(plaintext.as_slice(), 0).expect("zstd encode succeeds");
let compressed_len = compressed.len();
assert!(
compressed_len < plaintext_len,
"compressed body should be smaller"
);
ffi_serve(
server_ep.clone(),
ServeOptions {
decompression: Some(false),
..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let req_body_handle = payload.req_body_handle;
let res_body_handle = payload.res_body_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
let mut body = Vec::new();
while let Some(chunk) = server_ep
.handles()
.next_chunk(req_body_handle)
.await
.expect("read request body")
{
body.extend_from_slice(&chunk);
}
let body_len_str = body.len().to_string();
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
server_ep
.handles()
.send_chunk(res_body_handle, Bytes::from(body_len_str.into_bytes()))
.await
.unwrap();
server_ep.handles().finish_body(res_body_handle).unwrap();
});
},
);
let (writer_handle, body_reader) = client_ep
.handles()
.alloc_body_writer()
.expect("alloc body writer");
let client_ep_send = client_ep.clone();
let compressed_clone = compressed.clone();
tokio::spawn(async move {
client_ep_send
.handles()
.send_chunk(writer_handle, Bytes::from(compressed_clone))
.await
.unwrap();
client_ep_send.handles().finish_body(writer_handle).unwrap();
});
let res = fetch(
&client_ep,
&server_id,
"/upload",
"POST",
&[
(
"content-type".to_string(),
"application/octet-stream".to_string(),
),
("content-encoding".to_string(), "zstd".to_string()),
],
Some(body_reader),
None,
Some(&addrs),
None,
true,
None, )
.await
.expect("fetch should succeed");
assert_eq!(res.status, 200);
let mut response_body = Vec::new();
while let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
response_body.extend_from_slice(&chunk);
}
let handler_saw: usize = String::from_utf8(response_body)
.expect("utf8")
.parse()
.expect("integer");
assert_eq!(
handler_saw, compressed_len,
"with decompression=false the handler should see compressed ({compressed_len}) bytes, \
not decompressed ({plaintext_len})"
);
}
#[tokio::test]
async fn per_call_max_response_body_bytes_overrides_endpoint_default() {
use iroh_http_core::{IrohEndpoint, NetworkingOptions, NodeOptions};
let server_ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let client_ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
max_response_body_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.unwrap();
let server_id = server_ep.node_id().to_string();
let addrs: Vec<std::net::SocketAddr> = server_ep.raw().addr().ip_addrs().cloned().collect();
let payload_5mib = vec![b'x'; 5 * 1024 * 1024];
let payload_len = payload_5mib.len();
let server_ep_clone = server_ep.clone();
let payload_clone = payload_5mib.clone();
ffi_serve(
server_ep,
ServeOptions::default(),
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body_handle = payload.res_body_handle;
let server_ep = server_ep_clone.clone();
let body_bytes = Bytes::from(payload_clone.clone());
tokio::spawn(async move {
respond(
server_ep.handles(),
req_handle,
200,
vec![("content-length".to_string(), body_bytes.len().to_string())],
)
.unwrap();
let chunk_size = 1024 * 1024;
let mut offset = 0;
while offset < body_bytes.len() {
let end = (offset + chunk_size).min(body_bytes.len());
let chunk = body_bytes.slice(offset..end);
server_ep
.handles()
.send_chunk(res_body_handle, chunk)
.await
.unwrap();
offset = end;
}
server_ep.handles().finish_body(res_body_handle).unwrap();
});
},
);
let res = fetch(
&client_ep,
&server_id,
"/big",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
Some(10 * 1024 * 1024), )
.await
.expect("per-call 10 MiB limit should allow 5 MiB response");
assert_eq!(res.status, 200);
let mut received_bytes = 0usize;
while let Ok(Some(chunk)) = client_ep.handles().next_chunk(res.body_handle).await {
received_bytes += chunk.len();
}
assert_eq!(
received_bytes, payload_len,
"should receive all {payload_len} bytes; \
per-call 10 MiB limit must override the 1 MiB endpoint default"
);
}