mod common;
use bytes::Bytes;
use iroh_http_core::server::respond;
use iroh_http_core::{
fetch, serve, server::ServeOptions, IrohEndpoint, NetworkingOptions, NodeOptions,
RequestPayload,
};
#[tokio::test]
async fn graceful_shutdown_drains_in_flight() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let handler_started = std::sync::Arc::new(tokio::sync::Notify::new());
let handler_started_tx = handler_started.clone();
let handler_proceed = std::sync::Arc::new(tokio::sync::Notify::new());
let handler_proceed_rx = handler_proceed.clone();
let handle = serve(
server_ep.clone(),
ServeOptions {
drain_timeout_ms: Some(10_000),
..Default::default()
},
move |payload: RequestPayload| {
let res_h = payload.res_body_handle;
let req_h = payload.req_handle;
let started = handler_started_tx.clone();
let proceed = handler_proceed_rx.clone();
let server_ep = server_ep.clone();
tokio::spawn(async move {
started.notify_one();
proceed.notified().await;
respond(
server_ep.handles(),
req_h,
200,
vec![("content-length".into(), "2".into())],
)
.unwrap();
server_ep
.handles()
.send_chunk(res_h, Bytes::from_static(b"ok"))
.await
.unwrap();
server_ep.handles().finish_body(res_h).unwrap();
});
},
);
let fetch_task = {
let client = client_ep.clone();
let sid = server_id.clone();
let a = addrs.clone();
tokio::spawn(async move {
fetch(&client, &sid, "/slow", "GET", &[], None, None, Some(&a)).await
})
};
handler_started.notified().await;
let drain_done = std::sync::Arc::new(tokio::sync::Notify::new());
let drain_done_rx = drain_done.clone();
tokio::spawn(async move {
handle.drain().await;
drain_done.notify_one();
});
tokio::task::yield_now().await;
handler_proceed.notify_one();
tokio::time::timeout(std::time::Duration::from_secs(10), drain_done_rx.notified())
.await
.expect("drain should complete after handler finishes");
let result = fetch_task.await.unwrap();
assert!(
result.is_ok(),
"in-flight request should succeed: {:?}",
result
);
let res = result.unwrap();
assert_eq!(res.status, 200);
}
#[tokio::test]
async fn force_close_aborts_immediately() {
let (server_ep, _client_ep) = common::make_pair().await;
let _handle = serve(
server_ep.clone(),
ServeOptions::default(),
move |_payload: RequestPayload| {},
);
let start = std::time::Instant::now();
server_ep.close_force().await;
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(5),
"force close took too long: {elapsed:?}"
);
}
#[tokio::test]
async fn close_without_serve_is_immediate() {
let ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let start = std::time::Instant::now();
ep.close().await;
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_secs(1),
"close without serve took too long: {elapsed:?}"
);
}
#[tokio::test(start_paused = true)]
async fn shutdown_rejects_new_requests() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let server_ep_handler = server_ep.clone();
let handle = serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
respond(
server_ep_handler.handles(),
payload.req_handle,
200,
vec![("content-length".into(), "0".into())],
)
.unwrap();
server_ep_handler
.handles()
.finish_body(payload.res_body_handle)
.unwrap();
},
);
let res = fetch(
&client_ep,
&server_id,
"/before",
"GET",
&[],
None,
None,
Some(&addrs),
)
.await
.unwrap();
assert_eq!(res.status, 200);
while let Ok(Some(_)) = client_ep.handles().next_chunk(res.body_handle).await {}
handle.drain().await;
server_ep.close_force().await;
let result = fetch(
&client_ep,
&server_id,
"/after",
"GET",
&[],
None,
None,
Some(&addrs),
)
.await;
assert!(
result.is_err(),
"expected error after shutdown, got: {:?}",
result
);
}
#[tokio::test]
async fn shutdown_returns_immediately() {
let (server_ep, _client_ep) = common::make_pair().await;
let handle = serve(
server_ep.clone(),
ServeOptions::default(),
move |_payload: RequestPayload| {},
);
let start = std::time::Instant::now();
handle.shutdown();
let elapsed = start.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(100),
"shutdown() blocked for {elapsed:?}"
);
}
#[tokio::test(start_paused = true)]
async fn node_close_drains_in_flight() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let tx = std::sync::Arc::new(tokio::sync::Mutex::new(Some(tx)));
let handle = serve(
server_ep.clone(),
ServeOptions {
drain_timeout_ms: Some(5_000),
..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
let tx_clone = tx.clone();
let server_ep = server_ep.clone();
tokio::spawn(async move {
if let Some(tx) = tx_clone.lock().await.take() {
let _ = tx.send(());
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
server_ep.handles().finish_body(res_body).unwrap();
});
},
);
let fetch_task = tokio::spawn({
let client_ep = client_ep.clone();
async move {
fetch(
&client_ep,
&server_id,
"/drain-test",
"GET",
&[],
None,
None,
Some(&addrs),
)
.await
}
});
let _ = rx.await;
handle.drain().await;
let res = fetch_task.await.expect("join error");
assert_eq!(res.unwrap().status, 200);
}