use iroh::{
Endpoint, RelayMode,
address_lookup::{AddressLookup, pkarr::PkarrResolver},
endpoint::presets,
};
use n0_error::{Result, StdResultExt};
use n0_future::{
StreamExt, task,
time::{self, Duration},
};
#[cfg(not(wasm_browser))]
use tokio::test;
use tracing::{Instrument, info_span};
#[cfg(wasm_browser)]
use wasm_bindgen_test::wasm_bindgen_test as test;
const ECHO_ALPN: &[u8] = b"echo";
#[test]
async fn simple_endpoint_id_based_connection_transfer() -> Result {
std::panic::set_hook(Box::new(console_error_panic_hook::hook));
setup_logging();
let client = Endpoint::builder(presets::N0)
.relay_mode(RelayMode::Staging)
.bind()
.await?;
tracing::info!("started client, id {}", client.id().fmt_short());
let server = Endpoint::builder(presets::N0)
.relay_mode(RelayMode::Staging)
.alpns(vec![ECHO_ALPN.to_vec()])
.bind()
.await?;
tracing::info!("started server, id {}", server.id().fmt_short());
tracing::info!("waiting for server to go online");
time::timeout(Duration::from_secs(20), server.online())
.await
.std_context("server endpoint took too long to get online")?;
task::spawn({
tracing::info!("waiting for incoming connections on the server");
let server = server.clone();
async move {
while let Some(incoming) = server.accept().await {
tracing::info!("accepting connection");
let conn = incoming.await?;
let endpoint_id = conn.remote_id();
tracing::info!(endpoint_id = %endpoint_id.fmt_short(), "Accepted connection");
let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
let mut bytes_sent = 0;
while let Some(chunk) = recv.read_chunk(10_000).await.anyerr()? {
bytes_sent += chunk.bytes.len();
send.write_chunk(chunk.bytes).await.anyerr()?;
}
send.finish().anyerr()?;
tracing::info!("Copied over {bytes_sent} byte(s)");
let code = conn.closed().await;
tracing::info!("Closed with code: {code:?}");
}
n0_error::Ok(())
}
.instrument(info_span!("server"))
});
time::timeout(Duration::from_secs(20), {
let endpoint_id = server.id();
tracing::info!(
"start timeout waiting for records to be published, waiting for {endpoint_id} address lookup"
);
let tls_config = server.tls_config().clone();
async move {
let resolver = PkarrResolver::n0_dns().build(tls_config);
loop {
time::sleep(Duration::from_secs(1)).await;
let Some(mut stream) = resolver.resolve(endpoint_id) else {
tracing::info!("unable to get resolver stream, looping");
continue;
};
let Ok(Some(item)) = stream.try_next().await else {
tracing::info!("no items on stream when resolving, looping");
continue;
};
if item.relay_urls().next().is_some() {
tracing::info!("home relay found");
break;
}
}
}
})
.await
.anyerr()?;
tracing::info!(to = %server.id().fmt_short(), "Opening a connection");
let conn = client.connect(server.id(), ECHO_ALPN).await?;
tracing::info!("Connection opened");
let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
send.write_all(b"Hello, World!").await.anyerr()?;
send.finish().anyerr()?;
tracing::info!("Sent request");
let response = recv.read_to_end(10_000).await.anyerr()?;
tracing::info!(len = response.len(), "Received response");
assert_eq!(&response, b"Hello, World!");
tracing::info!("Closing connection");
conn.close(1u32.into(), b"thank you, bye");
client.close().await;
server.close().await;
Ok(())
}
#[cfg(wasm_browser)]
fn setup_logging() {
use tracing::Level;
let mut config = wasm_tracing::WasmLayerConfig::new();
config.set_max_level(Level::TRACE);
wasm_tracing::set_as_global_default_with_config(config).unwrap();
}
#[cfg(not(wasm_browser))]
fn setup_logging() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
}