use tokio::{
io::{self, AsyncRead, AsyncWrite},
sync::mpsc,
};
use crate::{
Client, ClientCtx, ClientHandler, Server, ServerCtx, ServerHandle, ServerHandler,
error::Result,
schema::{ClientNotification, ServerNotification},
};
pub fn make_duplex_pair() -> (
impl AsyncRead + Send + Sync + Unpin + 'static,
impl AsyncWrite + Send + Sync + Unpin + 'static,
impl AsyncRead + Send + Sync + Unpin + 'static,
impl AsyncWrite + Send + Sync + Unpin + 'static,
) {
let (server_reader, client_writer) = io::duplex(8 * 1024);
let (client_reader, server_writer) = io::duplex(8 * 1024);
(server_reader, server_writer, client_reader, client_writer)
}
pub async fn connected_client_and_server<F>(
handler_factory: F,
) -> Result<(Client<()>, ServerHandle)>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
{
let server = Server::from_factory(handler_factory);
let (server_reader, server_writer, client_reader, client_writer) = make_duplex_pair();
let server_handle = ServerHandle::from_stream(server, server_reader, server_writer).await?;
let mut client = Client::new("test-client", "1.0.0");
client
.connect_stream_raw(client_reader, client_writer)
.await?;
Ok((client, server_handle))
}
pub async fn connected_client_and_server_with_conn<F, C>(
handler_factory: F,
client_handler: C,
) -> Result<(Client<C>, ServerHandle)>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
C: ClientHandler + 'static,
{
let server = Server::from_factory(handler_factory);
let (server_reader, server_writer, client_reader, client_writer) = make_duplex_pair();
let server_handle = ServerHandle::from_stream(server, server_reader, server_writer).await?;
let mut client = Client::new("test-client", "1.0.0").with_handler(client_handler);
client
.connect_stream_raw(client_reader, client_writer)
.await?;
Ok((client, server_handle))
}
pub async fn shutdown_client_and_server<C>(client: Client<C>, server: ServerHandle)
where
C: ClientHandler + 'static,
{
use tokio::time::{Duration, timeout};
drop(client);
timeout(Duration::from_millis(10), server.stop()).await.ok();
}
pub fn test_server_ctx(notification_tx: mpsc::UnboundedSender<ServerNotification>) -> ServerCtx {
ServerCtx::new(notification_tx, None)
}
pub fn test_client_ctx(notification_tx: mpsc::UnboundedSender<ClientNotification>) -> ClientCtx {
ClientCtx::new(notification_tx)
}
pub struct TestServerContext {
ctx: ServerCtx,
notification_rx: mpsc::UnboundedReceiver<ServerNotification>,
}
impl TestServerContext {
pub fn new() -> Self {
let (notification_tx, notification_rx) = mpsc::unbounded_channel();
let ctx = test_server_ctx(notification_tx);
Self {
ctx,
notification_rx,
}
}
pub fn ctx(&self) -> &ServerCtx {
&self.ctx
}
pub async fn try_recv_notification(&mut self) -> Option<ServerNotification> {
use tokio::time::{Duration, timeout};
timeout(Duration::from_millis(10), self.notification_rx.recv())
.await
.ok()
.flatten()
}
}
impl Default for TestServerContext {
fn default() -> Self {
Self::new()
}
}
pub struct TestClientContext {
ctx: ClientCtx,
notification_rx: mpsc::UnboundedReceiver<ClientNotification>,
}
impl TestClientContext {
pub fn new() -> Self {
let (notification_tx, notification_rx) = mpsc::unbounded_channel();
let ctx = test_client_ctx(notification_tx);
Self {
ctx,
notification_rx,
}
}
pub fn ctx(&self) -> &ClientCtx {
&self.ctx
}
pub async fn try_recv_notification(&mut self) -> Option<ClientNotification> {
use tokio::time::{Duration, timeout};
timeout(Duration::from_millis(10), self.notification_rx.recv())
.await
.ok()
.flatten()
}
}
impl Default for TestClientContext {
fn default() -> Self {
Self::new()
}
}