use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use socketioxide::{SocketIo, extract::SocketRef, socket::DisconnectReason};
use socketioxide_core::adapter::Room;
use tokio::sync::mpsc;
mod fixture;
use fixture::{create_server, send_req};
use tokio_tungstenite::tungstenite::Message;
use crate::fixture::{create_polling_connection, create_ws_connection};
fn attach_handler(io: &SocketIo, chan_size: usize) -> mpsc::Receiver<DisconnectReason> {
let (tx, rx) = mpsc::channel::<DisconnectReason>(chan_size);
io.ns("/", async move |s: SocketRef| {
s.on_disconnect(async move |r: DisconnectReason| tx.try_send(r).unwrap())
});
rx
}
#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
create_polling_connection(&svc).await;
let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::HeartbeatTimeout")
.unwrap();
assert_eq!(data, DisconnectReason::HeartbeatTimeout);
}
#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let _stream = create_ws_connection(&svc).await;
let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::HeartbeatTimeout")
.unwrap();
assert_eq!(data, DisconnectReason::HeartbeatTimeout);
}
#[tokio::test]
pub async fn polling_transport_closed() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let sid = create_polling_connection(&svc).await;
send_req(
&svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
)
.await;
let data = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::TransportClose")
.unwrap();
assert_eq!(data, DisconnectReason::TransportClose);
}
#[tokio::test]
pub async fn ws_transport_closed() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let mut stream = create_ws_connection(&svc).await;
stream.send(Message::Text("1".into())).await.unwrap();
let data = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::TransportClose")
.unwrap();
assert_eq!(data, DisconnectReason::TransportClose);
}
#[tokio::test]
pub async fn multiple_http_polling() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let sid = create_polling_connection(&svc).await;
send_req(
&svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
)
.await;
let (_, _, data) = futures_util::future::join3(
send_req(
&svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
&svc,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
async move {
tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect(
"timeout waiting for DisconnectReason::DisconnectError::MultipleHttpPolling",
)
.unwrap()
},
)
.await;
assert_eq!(data, DisconnectReason::MultipleHttpPollingError);
}
#[tokio::test]
pub async fn polling_packet_parsing() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let sid = create_polling_connection(&svc).await;
send_req(
&svc,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
)
.await;
let data = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::PacketParsingError")
.unwrap();
assert_eq!(data, DisconnectReason::PacketParsingError);
}
#[tokio::test]
pub async fn ws_packet_parsing() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let mut stream = create_ws_connection(&svc).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
.unwrap();
let data = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::PacketParsingError")
.unwrap();
assert_eq!(data, DisconnectReason::PacketParsingError);
}
#[tokio::test]
pub async fn client_ns_disconnect() {
let (svc, io) = create_server().await;
let mut rx = attach_handler(&io, 1);
let mut stream = create_ws_connection(&svc).await;
stream.send(Message::Text("41".into())).await.unwrap();
let data = tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ClientNSDisconnect")
.unwrap();
assert_eq!(data, DisconnectReason::ClientNSDisconnect);
}
#[tokio::test]
pub async fn server_ns_disconnect() {
let (tx, mut rx) = mpsc::channel::<(DisconnectReason, Vec<Room>)>(1);
let (svc, io) = create_server().await;
io.ns("/", async move |socket: SocketRef, io: SocketIo| {
socket.join("testRoom1");
socket.join("testRoom2");
socket.join("testRoom3");
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let s = io.sockets().into_iter().next().unwrap();
s.disconnect().unwrap();
});
socket.on_disconnect(async move |s: SocketRef, reason: DisconnectReason| {
tx.try_send((reason, s.rooms())).unwrap();
});
});
let _stream = create_ws_connection(&svc).await;
let (disconnect_reason, mut rooms) =
tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ServerNSDisconnect")
.unwrap();
assert_eq!(disconnect_reason, DisconnectReason::ServerNSDisconnect);
rooms.sort();
assert_eq!(rooms, vec!["testRoom1", "testRoom2", "testRoom3"]);
}
#[tokio::test]
pub async fn server_ns_close() {
let (tx, mut rx) = mpsc::channel::<DisconnectReason>(1);
let (svc, io) = create_server().await;
io.ns("/test", async move |socket: SocketRef, io: SocketIo| {
socket.on_disconnect(async move |reason: DisconnectReason| tx.try_send(reason).unwrap());
io.delete_ns("/test");
});
let mut ws = create_ws_connection(&svc).await;
ws.send(Message::Text("40/test,{}".into())).await.unwrap();
let data = tokio::time::timeout(Duration::from_millis(20), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ServerNSDisconnect")
.unwrap();
assert_eq!(data, DisconnectReason::ServerNSDisconnect);
}
#[tokio::test]
pub async fn server_ws_closing() {
let (svc, io) = create_server().await;
let _rx = attach_handler(&io, 100);
let mut streams =
futures_util::future::join_all((0..100).map(|_| create_ws_connection(&svc))).await;
futures_util::future::join_all(streams.iter_mut().map(async |s| {
s.next().await; s.next().await; s.next().await; }))
.await;
tokio::time::timeout(Duration::from_millis(100), io.close())
.await
.expect("timeout waiting for server closing");
let packets = futures_util::future::join_all(streams.iter_mut().map(async |s| {
(s.next().await, s.next().await) }))
.await;
for packet in packets {
assert!(matches!(packet.0.unwrap().unwrap(), Message::Close(_)));
assert!(packet.1.is_none());
}
}
#[tokio::test]
pub async fn server_http_closing() {
let (svc, io) = create_server().await;
let _rx = attach_handler(&io, 100);
let mut sids =
futures_util::future::join_all((0..100).map(|_| create_polling_connection(&svc))).await;
futures_util::future::join_all(sids.iter_mut().map(|s| {
send_req(
&svc,
format!("transport=polling&sid={s}"),
http::Method::GET,
None,
)
}))
.await;
tokio::time::timeout(Duration::from_millis(20), io.close())
.await
.expect("timeout waiting for server closing");
let packets = futures_util::future::join_all(sids.iter_mut().map(|s| {
send_req(
&svc,
format!("transport=polling&sid={s}"),
http::Method::GET,
None,
)
}))
.await;
for packet in packets {
assert_eq!(
packet,
"{\"code\":\"1\",\"message\":\"Session ID unknown\"}"
);
}
}