use std::time::Duration;
use huddle_core::app::events::AppEvent;
use huddle_core::app::AppHandle;
use huddle_core::network::NetworkMode;
use huddle_core::storage;
use huddle_core::storage::repo::RoomKind;
use tokio::sync::broadcast;
const DISCOVERY_TIMEOUT_SECS: u64 = 30;
const MESSAGE_TIMEOUT_SECS: u64 = 15;
const DIRECT_DIAL_TIMEOUT_SECS: u64 = 15;
#[tokio::test]
async fn two_node_unencrypted_room_message_exchange() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let db_a = storage::open_db_in_memory().unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db(db_a).await.unwrap();
let handle_b = AppHandle::start_with_db(db_b).await.unwrap();
eprintln!("A fp={} B fp={}", handle_a.fingerprint(), handle_b.fingerprint());
let mut events_a = handle_a.subscribe();
let mut events_b = handle_b.subscribe();
let room_id = handle_a
.start_room("test-room", false, None, RoomKind::Group)
.await
.unwrap();
let target_room_id = room_id.clone();
let discovery = tokio::time::timeout(Duration::from_secs(DISCOVERY_TIMEOUT_SECS), async {
loop {
match events_b.recv().await {
Ok(AppEvent::RoomDiscovered(r)) if r.room_id == target_room_id => return,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
if discovery.is_err() {
eprintln!("room discovery timed out (mDNS may be blocked); skipping");
handle_a.shutdown().await;
handle_b.shutdown().await;
return;
}
eprintln!("B discovered room {}", room_id);
handle_b.join_room(&room_id, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
handle_a
.send_room_message(&room_id, "hello room")
.await
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(MESSAGE_TIMEOUT_SECS), async {
loop {
match events_b.recv().await {
Ok(AppEvent::MessageReceived { body, .. }) => return body,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
assert!(msg.is_ok(), "B never received the message");
assert_eq!(msg.unwrap(), "hello room");
handle_b
.send_room_message(&room_id, "hi back")
.await
.unwrap();
let reply = tokio::time::timeout(Duration::from_secs(MESSAGE_TIMEOUT_SECS), async {
loop {
match events_a.recv().await {
Ok(AppEvent::MessageReceived { body, .. }) => return body,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
assert!(reply.is_ok(), "A never received the reply");
assert_eq!(reply.unwrap(), "hi back");
handle_a.shutdown().await;
handle_b.shutdown().await;
}
#[tokio::test]
async fn two_node_encrypted_room_message_exchange() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let db_a = storage::open_db_in_memory().unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db(db_a).await.unwrap();
let handle_b = AppHandle::start_with_db(db_b).await.unwrap();
let mut events_a = handle_a.subscribe();
let mut events_b = handle_b.subscribe();
let room_id = handle_a
.start_room("secret-room", true, Some("hunter2"), RoomKind::Group)
.await
.unwrap();
let target_room_id = room_id.clone();
let discovery = tokio::time::timeout(Duration::from_secs(DISCOVERY_TIMEOUT_SECS), async {
loop {
match events_b.recv().await {
Ok(AppEvent::RoomDiscovered(r)) if r.room_id == target_room_id => return,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
if discovery.is_err() {
eprintln!("room discovery timed out; skipping encrypted test");
handle_a.shutdown().await;
handle_b.shutdown().await;
return;
}
handle_b
.join_room(&room_id, Some("hunter2"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(2500)).await;
handle_a
.send_room_message(&room_id, "encrypted hello")
.await
.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(MESSAGE_TIMEOUT_SECS), async {
loop {
match events_b.recv().await {
Ok(AppEvent::MessageReceived { body, .. }) => return body,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
assert!(msg.is_ok(), "B never decrypted the message");
assert_eq!(msg.unwrap(), "encrypted hello");
handle_b
.send_room_message(&room_id, "encrypted reply")
.await
.unwrap();
let reply = tokio::time::timeout(Duration::from_secs(MESSAGE_TIMEOUT_SECS), async {
loop {
match events_a.recv().await {
Ok(AppEvent::MessageReceived { body, .. }) => return body,
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await;
assert!(reply.is_ok());
assert_eq!(reply.unwrap(), "encrypted reply");
handle_a.shutdown().await;
handle_b.shutdown().await;
}
async fn spawn_direct_node() -> (AppHandle, broadcast::Receiver<AppEvent>, String) {
let db = storage::open_db_in_memory().unwrap();
let handle = AppHandle::start_with_db_and_options(
db,
NetworkMode::Direct,
0,
[0u8; 32],
Vec::new(),
)
.await
.unwrap();
let peer_id = handle.peer_id();
let mut rx = handle.subscribe();
let listen = tokio::time::timeout(Duration::from_secs(5), async {
loop {
match rx.recv().await {
Ok(AppEvent::ListeningOn { address }) if address.starts_with("/ip4/127.0.0.1/") => {
return address;
}
Ok(_) => {}
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await
.expect("listen address materialised within 5 s");
let full = format!("{}/p2p/{}", listen, peer_id);
(handle, rx, full)
}
async fn await_event<F>(
rx: &mut broadcast::Receiver<AppEvent>,
timeout: Duration,
mut predicate: F,
) -> Option<AppEvent>
where
F: FnMut(&AppEvent) -> bool,
{
tokio::time::timeout(timeout, async {
loop {
match rx.recv().await {
Ok(ev) => {
if predicate(&ev) {
return Some(ev);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
})
.await
.unwrap_or(None)
}
#[tokio::test]
async fn phase_a_inbound_dial_accept_forms_mesh() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let (handle_a, _events_a, _addr_a) = spawn_direct_node().await;
let (handle_b, mut events_b, addr_b) = spawn_direct_node().await;
handle_a.dial(&addr_b).await.unwrap();
let inbound = await_event(
&mut events_b,
Duration::from_secs(DIRECT_DIAL_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::InboundDial { fingerprint, .. } if fingerprint == handle_a.fingerprint()),
)
.await
.expect("B should see InboundDial from A");
let (peer_id, addr) = match inbound {
AppEvent::InboundDial { peer_id, address, .. } => (peer_id, address),
_ => unreachable!(),
};
assert_eq!(peer_id, handle_a.peer_id());
handle_b.accept_inbound(peer_id, &addr).await;
tokio::time::sleep(Duration::from_millis(1000)).await;
let room_id = handle_a
.start_room("phase-a-accept", false, None, RoomKind::Group)
.await
.unwrap();
let target = room_id.clone();
let discovered = await_event(
&mut events_b,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::RoomDiscovered(r) if r.room_id == target),
)
.await;
assert!(discovered.is_some(), "B never discovered A's room after accept");
handle_a.shutdown().await;
handle_b.shutdown().await;
}
#[tokio::test]
async fn phase_a_inbound_dial_reject_persists_block() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let (handle_a, _events_a, _addr_a) = spawn_direct_node().await;
let (handle_b, mut events_b, addr_b) = spawn_direct_node().await;
handle_a.dial(&addr_b).await.unwrap();
let a_fp = handle_a.fingerprint().to_string();
let inbound = await_event(
&mut events_b,
Duration::from_secs(DIRECT_DIAL_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::InboundDial { fingerprint, .. } if fingerprint == &a_fp),
)
.await
.expect("B should see InboundDial from A");
let peer_id = match inbound {
AppEvent::InboundDial { peer_id, .. } => peer_id,
_ => unreachable!(),
};
handle_b.reject_inbound(peer_id, &a_fp).await.unwrap();
assert!(
handle_b.list_blocked_peers().contains(&a_fp),
"B's blocklist should contain A's fingerprint after reject"
);
tokio::time::sleep(Duration::from_millis(500)).await;
handle_a.dial(&addr_b).await.unwrap();
let second = await_event(
&mut events_b,
Duration::from_secs(3),
|ev| matches!(ev, AppEvent::InboundDial { fingerprint, .. } if fingerprint == &a_fp),
)
.await;
assert!(
second.is_none(),
"B should NOT raise a second InboundDial — A is blocked"
);
handle_a.shutdown().await;
handle_b.shutdown().await;
}
#[tokio::test]
async fn phase_b_kick_rotates_key_and_excludes_banned() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let db_a = storage::open_db_in_memory().unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let db_c = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db(db_a).await.unwrap();
let handle_b = AppHandle::start_with_db(db_b).await.unwrap();
let handle_c = AppHandle::start_with_db(db_c).await.unwrap();
eprintln!(
"A fp={} B fp={} C fp={}",
handle_a.fingerprint(),
handle_b.fingerprint(),
handle_c.fingerprint()
);
let mut events_b = handle_b.subscribe();
let mut events_c = handle_c.subscribe();
let room_id = handle_a
.start_room("phase-b", true, Some("first-pass"), RoomKind::Group)
.await
.unwrap();
let target = room_id.clone();
let saw_b = await_event(
&mut events_b,
Duration::from_secs(DISCOVERY_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::RoomDiscovered(r) if r.room_id == target),
)
.await;
let saw_c = await_event(
&mut events_c,
Duration::from_secs(DISCOVERY_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::RoomDiscovered(r) if r.room_id == target),
)
.await;
if saw_b.is_none() || saw_c.is_none() {
eprintln!("3-node mDNS discovery timed out; skipping Phase B test");
handle_a.shutdown().await;
handle_b.shutdown().await;
handle_c.shutdown().await;
return;
}
handle_b
.join_room(&room_id, Some("first-pass"))
.await
.unwrap();
handle_c
.join_room(&room_id, Some("first-pass"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(2500)).await;
handle_a
.send_room_message(&room_id, "pre-kick from A")
.await
.unwrap();
let pre_b = await_event(
&mut events_b,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::MessageReceived { body, .. } if body == "pre-kick from A"),
)
.await;
let pre_c = await_event(
&mut events_c,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::MessageReceived { body, .. } if body == "pre-kick from A"),
)
.await;
assert!(pre_b.is_some() && pre_c.is_some(), "pre-kick fanout failed");
let new_pass = handle_a
.kick_member(&room_id, handle_b.fingerprint())
.await
.unwrap();
assert!(!new_pass.is_empty(), "encrypted room kick must return a new passphrase");
let rotation_for_c = await_event(
&mut events_c,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::RotationRequested { room_id: r, .. } if r == &room_id),
)
.await
.expect("C should see RotationRequested");
let new_salt = match rotation_for_c {
AppEvent::RotationRequested { new_salt, .. } => new_salt,
_ => unreachable!(),
};
handle_c
.accept_rotation(&room_id, &new_salt, &new_pass)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(2500)).await;
handle_a
.send_room_message(&room_id, "post-kick — only C should see this")
.await
.unwrap();
let to_c = await_event(
&mut events_c,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::MessageReceived { body, .. } if body == "post-kick — only C should see this"),
)
.await;
assert!(to_c.is_some(), "C should still decrypt A's post-kick message");
let to_b = await_event(
&mut events_b,
Duration::from_secs(3),
|ev| matches!(ev, AppEvent::MessageReceived { body, .. } if body == "post-kick — only C should see this"),
)
.await;
assert!(to_b.is_none(), "B should NOT decrypt the post-kick message");
handle_a.shutdown().await;
handle_b.shutdown().await;
handle_c.shutdown().await;
}
#[tokio::test]
async fn phase_f_code_join_round_trip() {
let _ = tracing_subscriber::fmt()
.with_test_writer()
.with_env_filter("huddle=debug,warn")
.try_init();
let db_a = storage::open_db_in_memory().unwrap();
let db_b = storage::open_db_in_memory().unwrap();
let handle_a = AppHandle::start_with_db(db_a).await.unwrap();
let handle_b = AppHandle::start_with_db(db_b).await.unwrap();
let mut events_b = handle_b.subscribe();
let room_id = handle_a
.start_room("phase-f", true, Some("alice-only"), RoomKind::Group)
.await
.unwrap();
let target = room_id.clone();
let discovered = await_event(
&mut events_b,
Duration::from_secs(DISCOVERY_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::RoomDiscovered(r) if r.room_id == target),
)
.await;
if discovered.is_none() {
eprintln!("Phase F: mDNS discovery timed out; skipping");
handle_a.shutdown().await;
handle_b.shutdown().await;
return;
}
let code = handle_a.generate_join_code(&room_id).unwrap();
assert_eq!(code.len(), 9, "code is 4-dash-4 = 9 chars: {}", code);
handle_b.join_room_with_code(&room_id, &code).await.unwrap();
tokio::time::sleep(Duration::from_millis(3000)).await;
handle_a
.send_room_message(&room_id, "alice -> code-joined bob")
.await
.unwrap();
let to_b = await_event(
&mut events_b,
Duration::from_secs(MESSAGE_TIMEOUT_SECS),
|ev| matches!(ev, AppEvent::MessageReceived { body, .. } if body == "alice -> code-joined bob"),
)
.await;
assert!(to_b.is_some(), "B should decrypt A's message after code-join");
assert!(
handle_b.is_room_read_only(&room_id),
"code-joined room should be read-only on B's side"
);
handle_a.shutdown().await;
handle_b.shutdown().await;
}