use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use bytes::Bytes;
use raknet_rust::client::{RaknetClient, RaknetClientEvent};
use raknet_rust::server::{RaknetServer, ServerFacade};
use tokio::time::{Instant, timeout};
fn allocate_loopback_bind_addr() -> SocketAddr {
let socket = std::net::UdpSocket::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.expect("ephemeral loopback bind must succeed");
socket
.local_addr()
.expect("ephemeral local addr must be available")
}
async fn wait_for_client_connected(client: &mut RaknetClient) {
loop {
let event = timeout(Duration::from_secs(3), client.next_event())
.await
.expect("timed out waiting for client event")
.expect("client event stream unexpectedly ended");
match event {
RaknetClientEvent::Connected { .. } => return,
RaknetClientEvent::Packet { .. }
| RaknetClientEvent::ReceiptAcked { .. }
| RaknetClientEvent::DecodeError { .. }
| RaknetClientEvent::Disconnected { .. } => {}
}
}
}
async fn wait_for_client_packet(client: &mut RaknetClient) -> Bytes {
loop {
let event = timeout(Duration::from_secs(3), client.next_event())
.await
.expect("timed out waiting for client packet")
.expect("client event stream unexpectedly ended");
match event {
RaknetClientEvent::Packet { payload, .. } => return payload,
RaknetClientEvent::Disconnected { reason } => {
panic!("client disconnected before packet arrived: {reason:?}")
}
RaknetClientEvent::Connected { .. }
| RaknetClientEvent::ReceiptAcked { .. }
| RaknetClientEvent::DecodeError { .. } => {}
}
}
}
async fn pump_facade_until(
facade: &mut ServerFacade<'_>,
timeout_budget: Duration,
mut condition: impl FnMut() -> bool,
) -> io::Result<()> {
let deadline = Instant::now() + timeout_budget;
while !condition() {
let now = Instant::now();
if now >= deadline {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"timed out waiting for facade condition",
));
}
let remaining = deadline.duration_since(now);
let step = remaining.min(Duration::from_millis(250));
let progressed = timeout(step, facade.next()).await.map_err(|_| {
io::Error::new(
io::ErrorKind::TimedOut,
"timed out waiting for facade event",
)
})??;
if !progressed {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"server event stream closed",
));
}
}
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn facade_handlers_drive_connect_packet_disconnect_flow() -> io::Result<()> {
let bind_addr = allocate_loopback_bind_addr();
let mut server = RaknetServer::bind(bind_addr).await?;
let connect_count = Arc::new(AtomicUsize::new(0));
let packet_count = Arc::new(AtomicUsize::new(0));
let disconnect_count = Arc::new(AtomicUsize::new(0));
let mut facade = server
.facade()
.on_connect({
let connect_count = Arc::clone(&connect_count);
move |_server, event| {
let connect_count = Arc::clone(&connect_count);
Box::pin(async move {
assert_ne!(event.peer_id.as_u64(), 0);
assert!(event.addr.ip().is_loopback());
connect_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
})
.on_packet({
let packet_count = Arc::clone(&packet_count);
move |server, event| {
let packet_count = Arc::clone(&packet_count);
Box::pin(async move {
packet_count.fetch_add(1, Ordering::SeqCst);
server.send(event.peer_id, event.payload).await?;
Ok(())
})
}
})
.on_disconnect({
let disconnect_count = Arc::clone(&disconnect_count);
move |_server, _event| {
let disconnect_count = Arc::clone(&disconnect_count);
Box::pin(async move {
disconnect_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
});
let mut client = RaknetClient::connect(bind_addr).await?;
wait_for_client_connected(&mut client).await;
pump_facade_until(&mut facade, Duration::from_secs(3), || {
connect_count.load(Ordering::SeqCst) >= 1
})
.await?;
let payload = Bytes::from_static(b"\xfe-facade-echo");
client.send(payload.clone()).await?;
pump_facade_until(&mut facade, Duration::from_secs(3), || {
packet_count.load(Ordering::SeqCst) >= 1
})
.await?;
let echoed = wait_for_client_packet(&mut client).await;
assert_eq!(echoed, payload);
let _ = client.disconnect(None).await;
pump_facade_until(&mut facade, Duration::from_secs(3), || {
disconnect_count.load(Ordering::SeqCst) >= 1
})
.await?;
drop(facade);
server.shutdown().await
}