use std::{str::FromStr, time::Duration};
use socketioxide::{
SocketIo, adapter::Adapter, extract::SocketRef, operators::BroadcastOperators,
socket::RemoteSocket,
};
use socketioxide_core::{Sid, Str, adapter::RemoteSocketData};
use tokio::time::Instant;
mod fixture;
fn extract_sid(data: &str) -> Sid {
let data = data
.split("\"sid\":\"")
.nth(1)
.and_then(|s| s.split('"').next())
.unwrap();
Sid::from_str(data).unwrap()
}
async fn fetch_sockets_data<A: Adapter>(op: BroadcastOperators<A>) -> Vec<RemoteSocketData> {
let mut sockets = op
.fetch_sockets()
.await
.unwrap()
.into_iter()
.map(RemoteSocket::into_data)
.collect::<Vec<_>>();
sockets.sort_by(|a, b| a.id.cmp(&b.id));
sockets
}
fn create_expected_sockets<const N: usize, A: Adapter>(
ids: [Sid; N],
ios: [&SocketIo<A>; N],
) -> [RemoteSocketData; N] {
let mut i = 0;
let mut sockets = ios.map(|io| {
let id = ids[i];
i += 1;
RemoteSocketData {
id,
server_id: io.config().server_id,
ns: Str::from("/"),
}
});
sockets.sort_by(|a, b| a.id.cmp(&b.id));
sockets
}
#[tokio::test]
pub async fn fetch_sockets() {
let [io1, io2, io3] = fixture::spawn_servers::<3>();
io1.ns("/", async || ()).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
io3.ns("/", async || ()).await.unwrap();
let (_, mut rx1) = io1.new_dummy_sock("/", ()).await;
let (_, mut rx2) = io2.new_dummy_sock("/", ()).await;
let (_, mut rx3) = io3.new_dummy_sock("/", ()).await;
let id1 = extract_sid(&timeout_rcv!(&mut rx1));
let id2 = extract_sid(&timeout_rcv!(&mut rx2));
let id3 = extract_sid(&timeout_rcv!(&mut rx3));
let mut expected_sockets = create_expected_sockets([id1, id2, id3], [&io1, &io2, &io3]);
expected_sockets.sort_by(|a, b| a.id.cmp(&b.id));
let sockets = fetch_sockets_data(io1.broadcast()).await;
assert_eq!(sockets, expected_sockets);
let sockets = fetch_sockets_data(io2.broadcast()).await;
assert_eq!(sockets, expected_sockets);
let sockets = fetch_sockets_data(io3.broadcast()).await;
assert_eq!(sockets, expected_sockets);
}
#[tokio::test]
pub async fn fetch_sockets_with_rooms() {
let [io1, io2, io3] = fixture::spawn_servers::<3>();
let handler =
|rooms: &'static [&'static str]| async move |socket: SocketRef<_>| socket.join(rooms);
io1.ns("/", handler(&["room1", "room2"])).await.unwrap();
io2.ns("/", handler(&["room2", "room3"])).await.unwrap();
io3.ns("/", handler(&["room3", "room1"])).await.unwrap();
let (_, mut rx1) = io1.new_dummy_sock("/", ()).await;
let (_, mut rx2) = io2.new_dummy_sock("/", ()).await;
let (_, mut rx3) = io3.new_dummy_sock("/", ()).await;
let id1 = extract_sid(&timeout_rcv!(&mut rx1));
let id2 = extract_sid(&timeout_rcv!(&mut rx2));
let id3 = extract_sid(&timeout_rcv!(&mut rx3));
let sockets = fetch_sockets_data(io1.to("room1")).await;
assert_eq!(sockets, create_expected_sockets([id1, id3], [&io1, &io3]));
let sockets = fetch_sockets_data(io1.to("room2")).await;
assert_eq!(sockets, create_expected_sockets([id1, id2], [&io1, &io2]));
let sockets = fetch_sockets_data(io1.to("room3")).await;
assert_eq!(sockets, create_expected_sockets([id2, id3], [&io2, &io3]));
}
#[tokio::test]
pub async fn fetch_sockets_timeout() {
const TIMEOUT: Duration = Duration::from_millis(50);
let [io1, io2] = fixture::spawn_buggy_servers(TIMEOUT);
io1.ns("/", async || ()).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
let (_, mut rx1) = io1.new_dummy_sock("/", ()).await;
let (_, mut rx2) = io2.new_dummy_sock("/", ()).await;
timeout_rcv!(&mut rx1); timeout_rcv!(&mut rx2);
let now = Instant::now();
io1.fetch_sockets().await.unwrap();
assert!(now.elapsed() >= TIMEOUT);
}
#[tokio::test]
pub async fn remote_socket_emit() {
let [io1, io2] = fixture::spawn_servers();
io1.ns("/", async || ()).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
let (_, mut rx1) = io1.new_dummy_sock("/", ()).await;
let (_, mut rx2) = io2.new_dummy_sock("/", ()).await;
timeout_rcv!(&mut rx1); timeout_rcv!(&mut rx2);
let sockets = io1.fetch_sockets().await.unwrap();
for socket in sockets {
socket.emit("test", "hello").await.unwrap();
}
assert_eq!(timeout_rcv!(&mut rx1), r#"42["test","hello"]"#);
assert_eq!(timeout_rcv!(&mut rx2), r#"42["test","hello"]"#);
}
#[tokio::test]
pub async fn remote_socket_emit_with_ack() {
let [io1, io2] = fixture::spawn_servers();
io1.ns("/", async || ()).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
let (_, mut rx1) = io1.new_dummy_sock("/", ()).await;
let (_, mut rx2) = io2.new_dummy_sock("/", ()).await;
timeout_rcv!(&mut rx1); timeout_rcv!(&mut rx2);
let sockets = io1.fetch_sockets().await.unwrap();
for socket in sockets {
#[allow(unused_must_use)]
socket
.emit_with_ack::<_, ()>("test", "hello")
.await
.unwrap();
}
assert_eq!(timeout_rcv!(&mut rx1), r#"421["test","hello"]"#);
assert_eq!(timeout_rcv!(&mut rx2), r#"421["test","hello"]"#);
}