use std::process::{Child, Command};
use std::time::Duration;
use huddle_core::app::{AppEvent, AppHandle};
use huddle_core::network::NetworkMode;
use huddle_core::storage::{self, repo};
use huddle_core::storage::repo::{RoomKind, StoredRoom};
struct Server(Child);
impl Drop for Server {
fn drop(&mut self) {
let _ = self.0.kill();
}
}
fn spawn_server(port: u16, db: &str) -> Server {
let child = Command::new(env!("CARGO_BIN_EXE_huddle-server"))
.env("HUDDLE_SERVER_BIND", format!("127.0.0.1:{port}"))
.env("HUDDLE_SERVER_DB", db)
.env("RUST_LOG", "warn")
.spawn()
.expect("spawn huddle-server");
Server(child)
}
async fn wait_listening(port: u16) {
for _ in 0..100 {
if tokio::net::TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("server never started listening");
}
async fn wait_server_connected(h: &AppHandle) {
for _ in 0..100 {
if h.server_connected() {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("AppHandle never connected to the server");
}
async fn next_message(rx: &mut tokio::sync::broadcast::Receiver<AppEvent>) -> String {
let fut = async {
loop {
match rx.recv().await {
Ok(AppEvent::MessageReceived { body, .. }) => return body,
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(_) => panic!("event channel closed"),
}
}
};
tokio::time::timeout(Duration::from_secs(10), fut)
.await
.expect("timed out waiting for MessageReceived")
}
async fn next_contact_request(rx: &mut tokio::sync::broadcast::Receiver<AppEvent>) -> String {
let fut = async {
loop {
match rx.recv().await {
Ok(AppEvent::ContactRequestReceived { fingerprint, .. }) => return fingerprint,
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(_) => panic!("event channel closed"),
}
}
};
tokio::time::timeout(Duration::from_secs(10), fut)
.await
.expect("timed out waiting for ContactRequestReceived")
}
#[tokio::test]
#[ignore]
async fn connects_to_live_onion_over_tor() {
let db = storage::open_db_in_memory().unwrap();
let handle = AppHandle::start_with_db_and_options(
db,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(huddle_core::app::DEFAULT_SERVER_URL.to_string()),
)
.await
.unwrap();
let connected = tokio::time::timeout(Duration::from_secs(90), async {
loop {
if handle.server_connected() {
return true;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
})
.await
.unwrap_or(false);
handle.shutdown().await;
assert!(connected, "did not connect to the live onion over Tor");
}
#[tokio::test]
async fn two_apps_exchange_messages_over_the_server() {
let dir = tempfile::tempdir().unwrap();
let port = 18811;
let _server = spawn_server(port, dir.path().join("srv.db").to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let db_a = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db_and_options(
db_a,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
let mut events_a = handle_a.subscribe();
let room_id = handle_a
.start_room("server-test", false, None, RoomKind::Group)
.await
.unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let info = StoredRoom {
id: room_id.clone(),
name: "server-test".into(),
creator_fingerprint: handle_a.fingerprint().to_string(),
encrypted: false,
passphrase_salt: None,
created_at: 0,
last_active: Some(0),
kind: RoomKind::Group,
};
repo::insert_room(&db_b, &info).unwrap();
let handle_b = AppHandle::start_with_db_and_options(
db_b,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
let mut events_b = handle_b.subscribe();
wait_server_connected(&handle_a).await;
wait_server_connected(&handle_b).await;
handle_b.join_room(&room_id, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(400)).await;
handle_a
.send_room_message(&room_id, "hello over the onion relay")
.await
.unwrap();
assert_eq!(next_message(&mut events_b).await, "hello over the onion relay");
handle_b.send_room_message(&room_id, "got it").await.unwrap();
assert_eq!(next_message(&mut events_a).await, "got it");
handle_a.shutdown().await;
handle_b.shutdown().await;
}
#[tokio::test]
async fn contact_request_over_server_opens_dm() {
let dir = tempfile::tempdir().unwrap();
let port = 18812;
let _server = spawn_server(port, dir.path().join("srv.db").to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let db_a = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db_and_options(
db_a,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let handle_b = AppHandle::start_with_db_and_options(
db_b,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
let mut events_a = handle_a.subscribe();
let mut events_b = handle_b.subscribe();
wait_server_connected(&handle_a).await;
wait_server_connected(&handle_b).await;
let a_fp = handle_a.fingerprint().to_string();
let b_fp = handle_b.fingerprint().to_string();
handle_a
.send_contact_request(&b_fp, Some("hi from A"))
.await
.unwrap();
assert_eq!(next_contact_request(&mut events_b).await, a_fp);
handle_b.accept_contact_request(&a_fp).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let dm_room = huddle_core::app::canonical_dm_room_id(&a_fp, &b_fp);
handle_a
.send_room_message(&dm_room, "yo over the inbox")
.await
.unwrap();
assert_eq!(next_message(&mut events_b).await, "yo over the inbox");
handle_b.send_room_message(&dm_room, "hey back").await.unwrap();
assert_eq!(next_message(&mut events_a).await, "hey back");
handle_a.shutdown().await;
handle_b.shutdown().await;
}
#[tokio::test]
async fn dm_stays_live_across_restart_over_server() {
let dir = tempfile::tempdir().unwrap();
let port = 18813;
let _server = spawn_server(port, dir.path().join("srv.db").to_str().unwrap());
wait_listening(port).await;
let url = format!("ws://127.0.0.1:{port}/ws");
let a_db_path = dir.path().join("a.db");
let db_b = storage::open_db_in_memory().unwrap();
let handle_b = AppHandle::start_with_db_and_options(
db_b,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
let mut events_b = handle_b.subscribe();
let b_fp = handle_b.fingerprint().to_string();
let a_fp;
{
let db_a = storage::open_db(&a_db_path, None).unwrap();
let handle_a = AppHandle::start_with_db_and_options(
db_a,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
a_fp = handle_a.fingerprint().to_string();
wait_server_connected(&handle_a).await;
wait_server_connected(&handle_b).await;
handle_a.start_direct(&b_fp).await.unwrap();
handle_b.start_direct(&a_fp).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let dm = huddle_core::app::canonical_dm_room_id(&a_fp, &b_fp);
handle_a.send_room_message(&dm, "before restart").await.unwrap();
assert_eq!(next_message(&mut events_b).await, "before restart");
handle_a.shutdown().await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
let db_a2 = storage::open_db(&a_db_path, None).unwrap();
let handle_a2 = AppHandle::start_with_db_and_options(
db_a2,
NetworkMode::Server,
0,
[0u8; 32],
Vec::new(),
huddle_core::app::TransportConfig::onion_only(url.clone()),
)
.await
.unwrap();
assert_eq!(handle_a2.fingerprint(), a_fp, "same identity across restart");
let mut events_a2 = handle_a2.subscribe();
wait_server_connected(&handle_a2).await;
let dm = huddle_core::app::canonical_dm_room_id(&a_fp, &b_fp);
assert!(
handle_a2.active_room_ids().contains(&dm),
"DM must be auto-activated on restart, not parked as restorable"
);
tokio::time::sleep(Duration::from_millis(800)).await;
handle_b.send_room_message(&dm, "after restart").await.unwrap();
assert_eq!(next_message(&mut events_a2).await, "after restart");
handle_a2.shutdown().await;
handle_b.shutdown().await;
}