use std::io::ErrorKind;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use turmoil_net::fixture::ClientServer;
use turmoil_net::shim::tokio::net::{TcpListener, TcpStream, UdpSocket};
use turmoil_net::{rule, Latency, Verdict};
#[test]
fn latency_delays_delivery() {
let latency = Duration::from_millis(50);
ClientServer::new()
.server("server", async move {
let s = UdpSocket::bind("0.0.0.0:9000").await.unwrap();
let mut buf = [0u8; 16];
let (n, from) = s.recv_from(&mut buf).await.unwrap();
s.send_to(&buf[..n], from).await.unwrap();
})
.run("client", async move {
rule(Latency::fixed(latency)).forget();
let c = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let start = tokio::time::Instant::now();
c.send_to(b"hi", "server:9000").await.unwrap();
let mut buf = [0u8; 16];
let (n, _) = c.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"hi");
let elapsed = start.elapsed();
assert!(
elapsed >= 2 * latency,
"expected ≥ {:?} for one full RTT, got {:?}",
2 * latency,
elapsed,
);
});
}
#[test]
fn drop_rule_blocks_all_traffic() {
ClientServer::new()
.server("server", async move {
let _l = TcpListener::bind("0.0.0.0:9000").await.unwrap();
std::future::pending::<()>().await;
})
.run("client", async move {
rule(|_: &_| Verdict::Drop).forget();
let err = TcpStream::connect("server:9000").await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::TimedOut);
});
}
#[test]
fn rule_guard_uninstalls_on_drop() {
ClientServer::new()
.server("server", async move {
let l = TcpListener::bind("0.0.0.0:9000").await.unwrap();
loop {
let (mut s, _) = l.accept().await.unwrap();
tokio::spawn(async move {
let mut buf = [0u8; 16];
if let Ok(n) = s.read(&mut buf).await {
if n > 0 {
let _ = s.write_all(&buf[..n]).await;
}
}
});
}
})
.run("client", async move {
{
let _g = rule(|_: &_| Verdict::Drop);
let err = TcpStream::connect("server:9000").await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::TimedOut);
}
let mut c = TcpStream::connect("server:9000").await.unwrap();
c.write_all(b"ok").await.unwrap();
let mut buf = [0u8; 2];
c.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"ok");
});
}
#[test]
fn pass_falls_through_to_next_rule() {
ClientServer::new()
.server("server", async move {
let _l = TcpListener::bind("0.0.0.0:9000").await.unwrap();
std::future::pending::<()>().await;
})
.run("client", async move {
rule(|_: &_| Verdict::Pass).forget();
rule(|_: &_| Verdict::Drop).forget();
let err = TcpStream::connect("server:9000").await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::TimedOut);
});
}
#[test]
fn first_non_pass_wins() {
ClientServer::new()
.server("server", async move {
let _l = TcpListener::bind("0.0.0.0:9000").await.unwrap();
std::future::pending::<()>().await;
})
.run("client", async move {
rule(|_: &_| Verdict::Drop).forget();
rule(|_: &_| Verdict::Deliver(Duration::ZERO)).forget();
let err = TcpStream::connect("server:9000").await.unwrap_err();
assert_eq!(err.kind(), ErrorKind::TimedOut);
});
}