use crate::*;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
fn init_tracing() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
tracing_subscriber::filter::EnvFilter::from_default_env(),
)
.with_file(true)
.with_line_number(true)
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);
}
#[tokio::test(flavor = "multi_thread")]
async fn relay_test_max_inbound_connections() {
init_tracing();
tracing::info!("setup relay");
let mut relay_config =
Tx3RelayConfig::new().with_bind("tx3-rst://127.0.0.1:0");
relay_config.max_inbound_connections = 1;
let relay = Tx3Relay::new(relay_config).await.unwrap();
let r_addr = relay.local_addrs()[0].to_owned();
tracing::info!("first connection");
let node1 = Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.unwrap();
tracing::info!("second (expect to fail) connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_err());
tracing::info!("drop first connection");
drop(node1);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::info!("third connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn relay_test_max_control_streams() {
init_tracing();
tracing::info!("setup relay");
let mut relay_config =
Tx3RelayConfig::new().with_bind("tx3-rst://127.0.0.1:0");
relay_config.max_control_streams = 1;
let relay = Tx3Relay::new(relay_config).await.unwrap();
let r_addr = relay.local_addrs()[0].to_owned();
tracing::info!("first connection");
let node1 = Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.unwrap();
tracing::info!("second (expect to fail) connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_err());
tracing::info!("drop first connection");
drop(node1);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::info!("third connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn relay_test_max_control_streams_per_ip() {
init_tracing();
tracing::info!("setup relay");
let mut relay_config =
Tx3RelayConfig::new().with_bind("tx3-rst://127.0.0.1:0");
relay_config.max_control_streams_per_ip = 1;
let relay = Tx3Relay::new(relay_config).await.unwrap();
let r_addr = relay.local_addrs()[0].to_owned();
tracing::info!("first connection");
let node1 = Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.unwrap();
tracing::info!("second (expect to fail) connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_err());
tracing::info!("drop first connection");
drop(node1);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::info!("third connection");
assert!(Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn relay_test_max_relays_per_control() {
init_tracing();
tracing::info!("setup relay");
let mut relay_config =
Tx3RelayConfig::new().with_bind("tx3-rst://127.0.0.1:0");
relay_config.max_relays_per_control = 1;
let relay = Tx3Relay::new(relay_config).await.unwrap();
let r_addr = relay.local_addrs()[0].to_owned();
tracing::info!("setup main receiver node");
let (main_ep, mut main_recv) =
Tx3Node::new(Tx3Config::new().with_bind(&r_addr))
.await
.unwrap();
let main_addr = main_ep.local_addrs()[0].to_owned();
let r_task = tokio::task::spawn(async move {
let mut all = Vec::new();
for _ in 0..2 {
tracing::info!("got one connection");
let a = main_recv.recv().await.unwrap();
all.push(tokio::task::spawn(async move {
let mut socket = a.accept().await.unwrap();
let mut got = [0; 5];
socket.read_exact(&mut got).await.unwrap();
assert_eq!(b"hello", &got[..]);
}));
}
for t in all {
t.await.unwrap();
}
});
tracing::info!("make first connection");
let (n1, _) = Tx3Node::new(Tx3Config::new()).await.unwrap();
let mut c1 = n1.connect(&main_addr).await.unwrap();
let (n2, _) = Tx3Node::new(Tx3Config::new()).await.unwrap();
tracing::info!("should fail to make second connection");
assert!(n2.connect(&main_addr).await.is_err());
c1.write_all(b"hello").await.unwrap();
c1.flush().await.unwrap();
tracing::info!("drop first connection");
drop(c1);
drop(n1);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tracing::info!("make third connection");
let (n3, _) = Tx3Node::new(Tx3Config::new()).await.unwrap();
let mut c3 = n3.connect(&main_addr).await.unwrap();
c3.write_all(b"hello").await.unwrap();
c3.flush().await.unwrap();
r_task.await.unwrap();
}