use bytes::Bytes;
use iroh_http_core::{
session_accept, session_close, session_closed, session_connect, session_create_uni_stream,
session_max_datagram_size, session_next_uni_stream, session_recv_datagram,
session_send_datagram, IrohEndpoint, NetworkingOptions, NodeOptions,
};
async fn make_pair() -> (IrohEndpoint, IrohEndpoint) {
let opts = || NodeOptions {
networking: NetworkingOptions {
disabled: true,
bind_addrs: vec!["127.0.0.1:0".into()],
..Default::default()
},
..Default::default()
};
let a = IrohEndpoint::bind(opts()).await.unwrap();
let b = IrohEndpoint::bind(opts()).await.unwrap();
(a, b)
}
fn node_id(ep: &IrohEndpoint) -> String {
ep.node_id().to_string()
}
fn direct_addrs(ep: &IrohEndpoint) -> Vec<std::net::SocketAddr> {
ep.raw().addr().ip_addrs().cloned().collect()
}
#[tokio::test]
async fn session_uni_stream_send_recv() {
let (a_ep, b_ep) = make_pair().await;
let b_id = node_id(&b_ep);
let b_addrs = direct_addrs(&b_ep);
let b_ep_spawn = b_ep.clone();
let b_handle = tokio::spawn(async move {
let session_b = session_accept(&b_ep_spawn).await.unwrap().unwrap();
let read_handle = session_next_uni_stream(&b_ep_spawn, session_b)
.await
.unwrap()
.unwrap();
let mut data = Vec::new();
while let Some(chunk) = b_ep_spawn.handles().next_chunk(read_handle).await.unwrap() {
data.extend_from_slice(&chunk);
}
(session_b, data)
});
let session_a = session_connect(&a_ep, &b_id, Some(&b_addrs)).await.unwrap();
let write_handle = session_create_uni_stream(&a_ep, session_a).await.unwrap();
a_ep.handles()
.send_chunk(write_handle, Bytes::from_static(b"uni-hello"))
.await
.unwrap();
a_ep.handles().finish_body(write_handle).unwrap();
let (session_b, data) = b_handle.await.unwrap();
assert_eq!(data, b"uni-hello");
session_close(&b_ep, session_b, 0, "").ok();
session_close(&a_ep, session_a, 0, "").ok();
}
#[tokio::test]
async fn session_multiple_uni_streams() {
let (a_ep, b_ep) = make_pair().await;
let b_id = node_id(&b_ep);
let b_addrs = direct_addrs(&b_ep);
let b_ep_spawn = b_ep.clone();
let b_handle = tokio::spawn(async move {
let session_b = session_accept(&b_ep_spawn).await.unwrap().unwrap();
let mut messages = Vec::new();
for _ in 0..3 {
let read_handle = session_next_uni_stream(&b_ep_spawn, session_b)
.await
.unwrap()
.unwrap();
let mut data = Vec::new();
while let Some(chunk) = b_ep_spawn.handles().next_chunk(read_handle).await.unwrap() {
data.extend_from_slice(&chunk);
}
messages.push(data);
}
(session_b, messages)
});
let session_a = session_connect(&a_ep, &b_id, Some(&b_addrs)).await.unwrap();
for i in 0..3u8 {
let write_handle = session_create_uni_stream(&a_ep, session_a).await.unwrap();
let msg = format!("msg-{i}");
a_ep.handles()
.send_chunk(write_handle, Bytes::from(msg.into_bytes()))
.await
.unwrap();
a_ep.handles().finish_body(write_handle).unwrap();
}
let (session_b, messages) = b_handle.await.unwrap();
assert_eq!(messages.len(), 3);
let mut sorted: Vec<String> = messages
.iter()
.map(|m| String::from_utf8(m.clone()).unwrap())
.collect();
sorted.sort();
assert_eq!(sorted, vec!["msg-0", "msg-1", "msg-2"]);
session_close(&b_ep, session_b, 0, "").ok();
session_close(&a_ep, session_a, 0, "").ok();
}
#[tokio::test]
async fn session_datagram_round_trip() {
let (a_ep, b_ep) = make_pair().await;
let b_id = node_id(&b_ep);
let b_addrs = direct_addrs(&b_ep);
let b_ep_spawn = b_ep.clone();
let b_handle = tokio::spawn(async move {
let session_b = session_accept(&b_ep_spawn).await.unwrap().unwrap();
let data = session_recv_datagram(&b_ep_spawn, session_b)
.await
.unwrap()
.unwrap();
session_send_datagram(&b_ep_spawn, session_b, b"pong").unwrap();
(session_b, data)
});
let session_a = session_connect(&a_ep, &b_id, Some(&b_addrs)).await.unwrap();
let max_size = session_max_datagram_size(&a_ep, session_a).unwrap();
assert!(max_size.is_some(), "datagrams should be supported");
assert!(max_size.unwrap() > 0);
session_send_datagram(&a_ep, session_a, b"ping").unwrap();
let reply = session_recv_datagram(&a_ep, session_a)
.await
.unwrap()
.unwrap();
assert_eq!(reply, b"pong");
let (session_b, received) = b_handle.await.unwrap();
assert_eq!(received, b"ping");
session_close(&b_ep, session_b, 0, "").ok();
session_close(&a_ep, session_a, 0, "").ok();
}
#[tokio::test]
async fn session_close_with_code_and_reason() {
let (a_ep, b_ep) = make_pair().await;
let b_id = node_id(&b_ep);
let b_addrs = direct_addrs(&b_ep);
let b_ep_spawn = b_ep.clone();
let b_handle = tokio::spawn(async move {
let session_b = session_accept(&b_ep_spawn).await.unwrap().unwrap();
let info = session_closed(&b_ep_spawn, session_b).await.unwrap();
info
});
let session_a = session_connect(&a_ep, &b_id, Some(&b_addrs)).await.unwrap();
session_close(&a_ep, session_a, 42, "done").unwrap();
let info = b_handle.await.unwrap();
assert_eq!(info.close_code, 42);
assert_eq!(info.reason, "done");
}