huddle-server 1.0.1

Centralized E2E relay + offline mailbox for huddle, designed to run behind a Tor v3 onion service. Treats huddle's wire bytes as opaque ciphertext — it never decrypts.
//! End-to-end test of the real client connector
//! (`huddle_core::network::server::ServerClient`) against the spawned
//! `huddle-server` binary, over a direct (non-Tor) WebSocket.

use std::process::{Child, Command};
use std::time::Duration;

use huddle_core::network::server::{ServerClient, ServerEvent};
use huddle_core::network::transport::DialMode;
use tokio::net::TcpStream;

/// Owns the spawned server process and kills it on drop — so even a failed
/// assertion (which unwinds) can't leave a zombie holding the test port.
struct Server(Child);
impl Drop for Server {
    fn drop(&mut self) {
        let _ = self.0.kill();
    }
}

fn spawn_server(port: u16, db: &str) -> Server {
    let child = Command::new(env!("CARGO_BIN_EXE_huddle-server"))
        .env("HUDDLE_SERVER_BIND", format!("127.0.0.1:{port}"))
        .env("HUDDLE_SERVER_DB", db)
        .env("RUST_LOG", "warn")
        .spawn()
        .expect("spawn huddle-server");
    Server(child)
}

async fn wait_listening(port: u16) {
    for _ in 0..100 {
        if TcpStream::connect(("127.0.0.1", port)).await.is_ok() {
            return;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
    panic!("server never started listening");
}

type Rx = tokio::sync::mpsc::UnboundedReceiver<ServerEvent>;

/// Pull the next `Message` event, skipping handshake/receipt events.
async fn next_message(rx: &mut Rx) -> (String, Vec<u8>) {
    let fut = async {
        loop {
            match rx.recv().await.expect("event stream open") {
                ServerEvent::Message { id, payload, .. } => return (id, payload),
                ServerEvent::Ready | ServerEvent::Sent { .. } => continue,
                ServerEvent::Disconnected => panic!("disconnected"),
            }
        }
    };
    tokio::time::timeout(Duration::from_secs(5), fut)
        .await
        .expect("timed out waiting for message")
}

/// Pull the next `Sent` delivery receipt, skipping other events.
async fn next_sent(rx: &mut Rx) -> (String, usize, usize) {
    let fut = async {
        loop {
            match rx.recv().await.expect("event stream open") {
                ServerEvent::Sent { id, delivered, queued } => return (id, delivered, queued),
                ServerEvent::Ready | ServerEvent::Message { .. } => continue,
                ServerEvent::Disconnected => panic!("disconnected"),
            }
        }
    };
    tokio::time::timeout(Duration::from_secs(5), fut)
        .await
        .expect("timed out waiting for sent receipt")
}

#[tokio::test]
async fn client_connector_fanout_and_mailbox() {
    let dir = tempfile::tempdir().unwrap();
    let db = dir.path().join("c.db");
    let port = 18799;
    let _server = spawn_server(port, db.to_str().unwrap());
    wait_listening(port).await;
    let url = format!("ws://127.0.0.1:{port}/ws");

    // A and B both members of ROOMX, both online (direct ws, no Tor).
    let (a, mut a_rx) = ServerClient::connect(&url, &DialMode::Direct, "AAAA".into(), vec!["ROOMX".into()])
        .await
        .unwrap();
    let (b, mut b_rx) = ServerClient::connect(&url, &DialMode::Direct, "BBBB".into(), vec!["ROOMX".into()])
        .await
        .unwrap();

    // Give both hellos time to register membership server-side.
    tokio::time::sleep(Duration::from_millis(200)).await;

    a.publish("ROOMX", "msg-1", b"\x00\x01\x02hello").unwrap();
    // B (online) receives the exact bytes back through base64.
    let (id, payload) = next_message(&mut b_rx).await;
    assert_eq!(id, "msg-1");
    assert_eq!(payload, b"\x00\x01\x02hello");
    // A gets a delivery receipt: 1 delivered live, 0 queued.
    let (sid, delivered, queued) = next_sent(&mut a_rx).await;
    assert_eq!(sid, "msg-1");
    assert_eq!((delivered, queued), (1, 0));

    // Offline mailbox: drop B (closes its socket → server marks it offline),
    // publish, then reconnect B and expect the queued message.
    drop(b);
    drop(b_rx);
    tokio::time::sleep(Duration::from_millis(300)).await;
    a.publish("ROOMX", "msg-2", b"queued-while-offline").unwrap();
    // The receipt now reports it queued (B offline), not delivered.
    let (sid2, delivered2, queued2) = next_sent(&mut a_rx).await;
    assert_eq!(sid2, "msg-2");
    assert_eq!((delivered2, queued2), (0, 1));

    let (_b2, mut b2_rx) = ServerClient::connect(&url, &DialMode::Direct, "BBBB".into(), vec!["ROOMX".into()])
        .await
        .unwrap();
    let (id2, payload2) = next_message(&mut b2_rx).await;
    assert_eq!(id2, "msg-2");
    assert_eq!(payload2, b"queued-while-offline");
}