#![allow(clippy::disallowed_types)] mod common;
use bytes::Bytes;
use iroh_http_core::respond;
use iroh_http_core::{
fetch, ffi_serve, IrohEndpoint, NetworkingOptions, NodeOptions, RequestPayload, ServeOptions,
};
#[tokio::test]
async fn fetch_cancelled_via_token() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let (request_arrived_tx, request_arrived_rx) = tokio::sync::oneshot::channel::<()>();
let request_arrived_tx = std::sync::Mutex::new(Some(request_arrived_tx));
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |_payload: RequestPayload| {
if let Some(tx) = request_arrived_tx.lock().unwrap().take() {
let _ = tx.send(());
}
},
);
let token = client_ep.handles().alloc_fetch_token().unwrap();
let client_ep_cancel = client_ep.clone();
tokio::spawn(async move {
let _ = request_arrived_rx.await;
client_ep_cancel.handles().cancel_in_flight(token);
});
let result = fetch(
&client_ep,
&server_id,
"/slow",
"GET",
&[],
None,
Some(token),
Some(&addrs),
None,
true,
None, )
.await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().code,
iroh_http_core::ErrorCode::Cancelled
);
}
#[tokio::test]
async fn respond_invalid_handle() {
let ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let result = respond(ep.handles(), 999999, 200, vec![]);
assert!(result.is_err());
}
#[tokio::test]
async fn fetch_bad_node_id_returns_error() {
let opts = NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
..Default::default()
};
let client = IrohEndpoint::bind(opts).await.unwrap();
let result = fetch(
&client,
"!!!invalid!!!",
"/",
"GET",
&[],
None,
None,
None,
None,
true,
None, )
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn fetch_unknown_peer() {
let fake_key = iroh_http_core::generate_secret_key().unwrap();
let fake_pk = iroh::SecretKey::from_bytes(&fake_key).public();
let fake_id = iroh_http_core::base32_encode(fake_pk.as_bytes());
let opts = NodeOptions {
networking: NetworkingOptions {
disabled: true,
..Default::default()
},
..Default::default()
};
let client_ep = IrohEndpoint::bind(opts).await.unwrap();
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
fetch(
&client_ep,
&fake_id,
"/",
"GET",
&[],
None,
None,
None,
None,
true,
None, ),
)
.await;
match result {
Ok(Err(_)) => {} Err(_) => {} Ok(Ok(res)) => panic!(
"expected error connecting to unknown peer, got status {}",
res.status
),
}
}
#[tokio::test]
async fn request_timeout_fires() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
ffi_serve(
server_ep.clone(),
ServeOptions {
request_timeout_ms: Some(100), ..Default::default()
},
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
let server_ep = server_ep.clone();
tokio::spawn(async move {
std::future::pending::<()>().await;
let _ = respond(server_ep.handles(), req_handle, 200, vec![]);
let _ = server_ep.handles().finish_body(res_body);
});
},
);
let result = tokio::time::timeout(
std::time::Duration::from_secs(10),
fetch(
&client_ep,
&server_id,
"/slow",
"GET",
&[],
None,
None,
Some(&addrs),
None,
true,
None, ),
)
.await;
assert!(result.is_ok(), "fetch should not hang past the timeout");
}
#[tokio::test]
async fn cancel_mid_stream_no_panic() {
let (server_ep, client_ep) = common::make_pair().await;
let server_id = common::node_id(&server_ep);
let addrs = common::server_addrs(&server_ep);
let (request_arrived_tx, request_arrived_rx) = tokio::sync::oneshot::channel::<()>();
let request_arrived_tx = std::sync::Mutex::new(Some(request_arrived_tx));
ffi_serve(
server_ep.clone(),
ServeOptions::default(),
move |payload: RequestPayload| {
let req_handle = payload.req_handle;
let res_body = payload.res_body_handle;
if let Some(tx) = request_arrived_tx.lock().unwrap().take() {
let _ = tx.send(());
}
let server_ep = server_ep.clone();
tokio::spawn(async move {
respond(server_ep.handles(), req_handle, 200, vec![]).unwrap();
for i in 0..100 {
let chunk = Bytes::from(format!("chunk-{i}\n"));
if server_ep
.handles()
.send_chunk(res_body, chunk)
.await
.is_err()
{
break; }
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let _ = server_ep.handles().finish_body(res_body);
});
},
);
let token = client_ep.handles().alloc_fetch_token().unwrap();
let client_ep_cancel = client_ep.clone();
tokio::spawn(async move {
let _ = request_arrived_rx.await;
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
client_ep_cancel.handles().cancel_in_flight(token);
});
let result = fetch(
&client_ep,
&server_id,
"/stream",
"GET",
&[],
None,
Some(token),
Some(&addrs),
None,
true,
None, )
.await;
let _ = result;
}
#[tokio::test]
async fn cancel_reader_terminates_in_flight_read() {
let ep = IrohEndpoint::bind(NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let (writer_handle, reader) = ep.handles().alloc_body_writer().unwrap();
let reader_handle = ep.handles().insert_reader(reader).unwrap();
ep.handles()
.send_chunk(writer_handle, Bytes::from("hello"))
.await
.unwrap();
let chunk = ep.handles().next_chunk(reader_handle).await.unwrap();
assert_eq!(chunk.as_deref(), Some(b"hello".as_ref()));
let ep2 = ep.clone();
let read_task = tokio::spawn(async move { ep2.handles().next_chunk(reader_handle).await });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
ep.handles().cancel_reader(reader_handle);
let result = tokio::time::timeout(std::time::Duration::from_secs(2), read_task)
.await
.expect("read task should complete promptly after cancel")
.expect("read task should not panic");
assert!(
result.is_ok(),
"next_chunk should not return an error on cancel"
);
assert_eq!(result.unwrap(), None, "cancelled read should return None");
ep.close().await;
}