use std::process::{Child, Command};
use std::time::Duration;
use huddle_core::network::server::{ServerClient, ServerEvent};
use tokio::net::TcpStream;
struct Server(Child);
impl Drop for Server {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
fn spawn_server(port: u16, db: &str) -> Server {
let child = Command::new(env!("CARGO_BIN_EXE_huddle-server"))
.env("HUDDLE_SERVER_BIND", format!("127.0.0.1:{port}"))
.env("HUDDLE_SERVER_DB", db)
.env("RUST_LOG", "warn")
.spawn()
.expect("spawn huddle-server");
Server(child)
}
async fn wait_listening(port: u16) {
for _ in 0..100 {
if TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("server never started listening");
}
type Rx = tokio::sync::mpsc::UnboundedReceiver<ServerEvent>;
async fn next_message(rx: &mut Rx) -> (String, Vec<u8>) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::Message { id, payload, .. } => return (id, payload),
ServerEvent::Ready | ServerEvent::Sent { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for message")
}
async fn next_sent(rx: &mut Rx) -> (String, usize, usize) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::Sent { id, delivered, queued } => return (id, delivered, queued),
ServerEvent::Ready | ServerEvent::Message { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for sent receipt")
}
#[tokio::test]
async fn client_connector_fanout_and_mailbox() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("c.db");
let port = 18799;
let _server = spawn_server(port, db.to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let (a, mut a_rx) = ServerClient::connect(&url, None, "AAAA".into(), vec!["ROOMX".into()])
.await
.unwrap();
let (b, mut b_rx) = ServerClient::connect(&url, None, "BBBB".into(), vec!["ROOMX".into()])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.publish("ROOMX", "msg-1", b"\x00\x01\x02hello").unwrap();
let (id, payload) = next_message(&mut b_rx).await;
assert_eq!(id, "msg-1");
assert_eq!(payload, b"\x00\x01\x02hello");
let (sid, delivered, queued) = next_sent(&mut a_rx).await;
assert_eq!(sid, "msg-1");
assert_eq!((delivered, queued), (1, 0));
drop(b);
drop(b_rx);
tokio::time::sleep(Duration::from_millis(300)).await;
a.publish("ROOMX", "msg-2", b"queued-while-offline").unwrap();
let (sid2, delivered2, queued2) = next_sent(&mut a_rx).await;
assert_eq!(sid2, "msg-2");
assert_eq!((delivered2, queued2), (0, 1));
let (_b2, mut b2_rx) = ServerClient::connect(&url, None, "BBBB".into(), vec!["ROOMX".into()])
.await
.unwrap();
let (id2, payload2) = next_message(&mut b2_rx).await;
assert_eq!(id2, "msg-2");
assert_eq!(payload2, b"queued-while-offline");
}