#![allow(dead_code)]
use ironsbe_client::{Client, ClientBuilder, ClientEvent, ClientHandle};
use ironsbe_core::header::MessageHeader;
use ironsbe_server::{MessageHandler, Server, ServerBuilder, ServerEvent, ServerHandle};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
pub const DEFAULT_WAIT: Duration = Duration::from_secs(5);
pub async fn build_and_start_server<H>(
handler: H,
max_connections: usize,
) -> (Arc<ServerHandle>, SocketAddr, JoinHandle<()>)
where
H: MessageHandler + 'static,
{
let bind_addr: SocketAddr = "127.0.0.1:0".parse().expect("hardcoded addr");
let (mut server, handle): (Server<H>, ServerHandle) = ServerBuilder::<H>::new()
.bind(bind_addr)
.handler(handler)
.max_connections(max_connections)
.build();
let handle = Arc::new(handle);
let server_task = tokio::spawn(async move {
let _ = server.run().await;
});
let deadline = Instant::now() + DEFAULT_WAIT;
let server_addr = wait_for_listening(&handle, deadline)
.await
.expect("server did not emit Listening within 5 s");
(handle, server_addr, server_task)
}
pub async fn build_and_start_client(
addr: SocketAddr,
connect_timeout: Duration,
max_reconnect_attempts: usize,
) -> (ClientHandle, JoinHandle<()>) {
let (client, handle): (Client, ClientHandle) = ClientBuilder::new(addr)
.connect_timeout(connect_timeout)
.max_reconnect_attempts(max_reconnect_attempts)
.build();
let mut client = client;
let client_task = tokio::spawn(async move {
let _ = client.run().await;
});
(handle, client_task)
}
pub fn build_sbe_message(template_id: u16, payload: &[u8]) -> Vec<u8> {
let header_size = MessageHeader::ENCODED_LENGTH;
let mut frame = vec![0u8; header_size + payload.len()];
let block_length = u16::try_from(payload.len()).unwrap_or(u16::MAX);
let header = MessageHeader::new(block_length, template_id, 1, 1);
header.encode(frame.as_mut_slice(), 0);
frame[header_size..].copy_from_slice(payload);
frame
}
pub async fn wait_for_listening(
handle: &Arc<ServerHandle>,
deadline: Instant,
) -> Option<SocketAddr> {
while Instant::now() < deadline {
for event in handle.poll_events() {
if let ServerEvent::Listening(addr) = event {
return Some(addr);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
None
}
pub async fn wait_for_session_created(
handle: &Arc<ServerHandle>,
deadline: Instant,
) -> Option<u64> {
while Instant::now() < deadline {
for event in handle.poll_events() {
if let ServerEvent::SessionCreated(id, _) = event {
return Some(id);
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
None
}
pub async fn wait_for_session_closed(
handle: &Arc<ServerHandle>,
expected: u64,
deadline: Instant,
) -> bool {
while Instant::now() < deadline {
for event in handle.poll_events() {
if matches!(event, ServerEvent::SessionClosed(id) if id == expected) {
return true;
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
false
}
pub async fn wait_for_client_event<F>(
handle: &mut ClientHandle,
pred: F,
deadline: Instant,
) -> Option<ClientEvent>
where
F: Fn(&ClientEvent) -> bool,
{
while Instant::now() < deadline {
if let Some(event) = handle.poll()
&& pred(&event)
{
return Some(event);
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
None
}
pub async fn wait_for_client_connected(handle: &mut ClientHandle, deadline: Instant) -> bool {
wait_for_client_event(handle, |e| matches!(e, ClientEvent::Connected), deadline)
.await
.is_some()
}
pub async fn wait_for_client_message(
handle: &mut ClientHandle,
deadline: Instant,
) -> Option<Vec<u8>> {
while Instant::now() < deadline {
if let Some(ClientEvent::Message(bytes)) = handle.poll() {
return Some(bytes);
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
None
}