use std::time::Duration;
use atomr_core::io::manager::{IoEvent, TcpManager, UdpManager};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
async fn next_event(rx: &mut tokio::sync::mpsc::UnboundedReceiver<IoEvent>) -> IoEvent {
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("event timeout")
.expect("rx closed")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_bind_emits_bound_event() {
let (mgr, mut events) = TcpManager::spawn();
mgr.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let evt = next_event(&mut events).await;
let bound = match evt {
IoEvent::Bound { addr } => addr,
other => panic!("expected Bound, got {other:?}"),
};
assert!(bound.ip().is_loopback());
assert_ne!(bound.port(), 0);
mgr.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_outbound_connect_emits_connected_event() {
let (server, mut server_events) = TcpManager::spawn();
server.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let bound = match next_event(&mut server_events).await {
IoEvent::Bound { addr } => addr,
other => panic!("server expected Bound, got {other:?}"),
};
let (client, mut client_events) = TcpManager::spawn();
client.connect(bound).unwrap();
let _id = match next_event(&mut client_events).await {
IoEvent::Connected { id, peer } => {
assert_eq!(peer, bound);
id
}
other => panic!("client expected Connected, got {other:?}"),
};
match next_event(&mut server_events).await {
IoEvent::Connected { .. } => {}
other => panic!("server expected Connected, got {other:?}"),
}
server.shutdown();
client.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_outbound_to_unbound_addr_yields_error_event() {
let (mgr, mut events) = TcpManager::spawn();
mgr.connect("127.0.0.1:1".parse().unwrap()).unwrap();
match next_event(&mut events).await {
IoEvent::Error { .. } => {}
other => panic!("expected Error, got {other:?}"),
}
mgr.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_client_close_drives_closed_event() {
let (mgr, mut events) = TcpManager::spawn();
mgr.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let bound = match next_event(&mut events).await {
IoEvent::Bound { addr } => addr,
other => panic!("expected Bound, got {other:?}"),
};
let mut client = TcpStream::connect(bound).await.unwrap();
let _id = match next_event(&mut events).await {
IoEvent::Connected { id, .. } => id,
other => panic!("expected Connected, got {other:?}"),
};
client.shutdown().await.unwrap();
drop(client);
let mut closed_seen = false;
for _ in 0..3 {
match next_event(&mut events).await {
IoEvent::Closed { .. } => {
closed_seen = true;
break;
}
IoEvent::Received { .. } => continue,
other => panic!("unexpected event after client close: {other:?}"),
}
}
assert!(closed_seen);
mgr.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_send_back_and_forth_round_trip() {
let (mgr, mut events) = TcpManager::spawn();
mgr.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let bound = match next_event(&mut events).await {
IoEvent::Bound { addr } => addr,
other => panic!("{other:?}"),
};
let mut client = TcpStream::connect(bound).await.unwrap();
let id = match next_event(&mut events).await {
IoEvent::Connected { id, .. } => id,
other => panic!("{other:?}"),
};
client.write_all(b"hello").await.unwrap();
match next_event(&mut events).await {
IoEvent::Received { bytes, .. } => assert_eq!(&bytes, b"hello"),
other => panic!("{other:?}"),
}
mgr.send_bytes(id, b"world".to_vec()).unwrap();
let mut buf = [0u8; 5];
client.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"world");
mgr.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn udp_two_managers_can_exchange_datagrams() {
let (a, mut a_rx) = UdpManager::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
let (b, mut b_rx) = UdpManager::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
a.send_to(b.local_addr(), b"ping".to_vec()).unwrap();
b.send_to(a.local_addr(), b"pong".to_vec()).unwrap();
match next_event(&mut a_rx).await {
IoEvent::Datagram { bytes, .. } => assert_eq!(&bytes, b"pong"),
other => panic!("{other:?}"),
}
match next_event(&mut b_rx).await {
IoEvent::Datagram { bytes, .. } => assert_eq!(&bytes, b"ping"),
other => panic!("{other:?}"),
}
a.shutdown();
b.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn udp_local_addr_is_loopback_with_assigned_port() {
let (a, _rx) = UdpManager::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
let local = a.local_addr();
assert!(local.ip().is_loopback());
assert_ne!(local.port(), 0);
a.shutdown();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn tcp_manager_close_command_drops_send_capability() {
let (mgr, mut events) = TcpManager::spawn();
mgr.bind("127.0.0.1:0".parse().unwrap()).unwrap();
let bound = match next_event(&mut events).await {
IoEvent::Bound { addr } => addr,
other => panic!("{other:?}"),
};
let _client = TcpStream::connect(bound).await.unwrap();
let id = match next_event(&mut events).await {
IoEvent::Connected { id, .. } => id,
other => panic!("{other:?}"),
};
mgr.close(id).unwrap();
mgr.send_bytes(id, b"after-close".to_vec()).unwrap();
mgr.shutdown();
}