use std::process::{Child, Command};
use std::sync::Arc;
use std::time::Duration;
use huddle_core::identity::Identity;
use huddle_core::network::server::{ServerClient, ServerEvent};
use huddle_core::network::transport::DialMode;
use tokio::net::TcpStream;
struct Server(Child);
impl Drop for Server {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
fn spawn_server(port: u16, db: &str) -> Server {
let child = Command::new(env!("CARGO_BIN_EXE_huddle-server"))
.env("HUDDLE_SERVER_BIND", format!("127.0.0.1:{port}"))
.env("HUDDLE_SERVER_DB", db)
.env("RUST_LOG", "warn")
.spawn()
.expect("spawn huddle-server");
Server(child)
}
async fn wait_listening(port: u16) {
for _ in 0..100 {
if TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("server never started listening");
}
type Rx = tokio::sync::mpsc::UnboundedReceiver<ServerEvent>;
async fn next_message(rx: &mut Rx) -> (String, Vec<u8>) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::Message { id, payload, .. } => return (id, payload),
ServerEvent::Ready
| ServerEvent::Sent { .. }
| ServerEvent::ConnectToken { .. }
| ServerEvent::ConnectTokenResolved { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for message")
}
async fn next_sent(rx: &mut Rx) -> (String, usize, usize) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::Sent { id, delivered, queued } => return (id, delivered, queued),
ServerEvent::Ready
| ServerEvent::Message { .. }
| ServerEvent::ConnectToken { .. }
| ServerEvent::ConnectTokenResolved { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for sent receipt")
}
async fn next_connect_token(rx: &mut Rx) -> (String, u64) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::ConnectToken { token, ttl_secs } => return (token, ttl_secs),
ServerEvent::Ready
| ServerEvent::Sent { .. }
| ServerEvent::Message { .. }
| ServerEvent::ConnectTokenResolved { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for connect token")
}
async fn next_connect_resolved(rx: &mut Rx) -> (Option<String>, Option<String>) {
let fut = async {
loop {
match rx.recv().await.expect("event stream open") {
ServerEvent::ConnectTokenResolved { fingerprint, pubkey_b64 } => {
return (fingerprint, pubkey_b64)
}
ServerEvent::Ready
| ServerEvent::Sent { .. }
| ServerEvent::Message { .. }
| ServerEvent::ConnectToken { .. } => continue,
ServerEvent::Disconnected => panic!("disconnected"),
}
}
};
tokio::time::timeout(Duration::from_secs(5), fut)
.await
.expect("timed out waiting for connect-token resolution")
}
#[tokio::test]
async fn connect_token_mint_and_redeem() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("t.db");
let port = 18802;
let _server = spawn_server(port, db.to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let a_id = Arc::new(Identity::generate().unwrap());
let b_id = Arc::new(Identity::generate().unwrap());
let a_fp = a_id.fingerprint().to_string();
let (a, mut a_rx) = ServerClient::connect(&url, &DialMode::Direct, a_id.clone(), vec![])
.await
.unwrap();
let (b, mut b_rx) = ServerClient::connect(&url, &DialMode::Direct, b_id.clone(), vec![])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.create_connect_token().unwrap();
let (code, ttl) = next_connect_token(&mut a_rx).await;
assert_eq!(code.len(), 8, "code is 8 chars: {code}");
assert!(ttl >= 60, "ttl is a few minutes, got {ttl}");
b.redeem_connect_token(&code).unwrap();
let (fp, pk) = next_connect_resolved(&mut b_rx).await;
assert_eq!(fp.as_deref(), Some(a_fp.as_str()));
assert!(pk.is_some(), "owner pubkey returned for TOFU pinning");
b.redeem_connect_token("ZZZZZZZZ").unwrap();
let (fp2, pk2) = next_connect_resolved(&mut b_rx).await;
assert_eq!(fp2, None);
assert_eq!(pk2, None);
}
#[tokio::test]
async fn client_connector_fanout_and_mailbox() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("c.db");
let port = 18799;
let _server = spawn_server(port, db.to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let a_id = Arc::new(Identity::generate().unwrap());
let b_id = Arc::new(Identity::generate().unwrap());
let (a, mut a_rx) = ServerClient::connect(&url, &DialMode::Direct, a_id.clone(), vec!["ROOMX".into()])
.await
.unwrap();
let (b, mut b_rx) = ServerClient::connect(&url, &DialMode::Direct, b_id.clone(), vec!["ROOMX".into()])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.publish("ROOMX", "msg-1", b"\x00\x01\x02hello").unwrap();
let (id, payload) = next_message(&mut b_rx).await;
assert_eq!(id, "msg-1");
assert_eq!(payload, b"\x00\x01\x02hello");
let (sid, delivered, queued) = next_sent(&mut a_rx).await;
assert_eq!(sid, "msg-1");
assert_eq!((delivered, queued), (1, 0));
drop(b);
drop(b_rx);
tokio::time::sleep(Duration::from_millis(300)).await;
a.publish("ROOMX", "msg-2", b"queued-while-offline").unwrap();
let (sid2, delivered2, queued2) = next_sent(&mut a_rx).await;
assert_eq!(sid2, "msg-2");
assert_eq!((delivered2, queued2), (0, 1));
let (_b2, mut b2_rx) = ServerClient::connect(&url, &DialMode::Direct, b_id.clone(), vec!["ROOMX".into()])
.await
.unwrap();
let (id2, payload2) = next_message(&mut b2_rx).await;
assert_eq!(id2, "msg-2");
assert_eq!(payload2, b"queued-while-offline");
}
#[tokio::test]
async fn send_direct_delivers_by_fingerprint_without_membership() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("d.db");
let port = 18801;
let _server = spawn_server(port, db.to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let a_id = Arc::new(Identity::generate().unwrap());
let b_id = Arc::new(Identity::generate().unwrap());
let b_fp = b_id.fingerprint().to_string();
let (a, mut a_rx) = ServerClient::connect(&url, &DialMode::Direct, a_id.clone(), vec![])
.await
.unwrap();
let (_b, mut b_rx) = ServerClient::connect(&url, &DialMode::Direct, b_id.clone(), vec![])
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.send_direct(&b_fp, "dm-tag-1", "d1", b"\x00direct-hi").unwrap();
let (id, payload) = next_message(&mut b_rx).await;
assert_eq!(id, "d1");
assert_eq!(payload, b"\x00direct-hi");
let (sid, delivered, queued) = next_sent(&mut a_rx).await;
assert_eq!(sid, "d1");
assert_eq!((delivered, queued), (1, 0));
drop(_b);
drop(b_rx);
tokio::time::sleep(Duration::from_millis(300)).await;
a.send_direct(&b_fp, "dm-tag-1", "d2", b"direct-while-offline").unwrap();
let (sid2, delivered2, queued2) = next_sent(&mut a_rx).await;
assert_eq!(sid2, "d2");
assert_eq!((delivered2, queued2), (0, 1));
let (_b2, mut b2_rx) = ServerClient::connect(&url, &DialMode::Direct, b_id.clone(), vec![])
.await
.unwrap();
let (id2, payload2) = next_message(&mut b2_rx).await;
assert_eq!(id2, "d2");
assert_eq!(payload2, b"direct-while-offline");
}