use std::convert::Infallible;
use std::time::Duration;
use serde_json::json;
use socketioxide::ParserError;
use socketioxide::extract::{Data, Event, Extension, MaybeExtension, SocketRef, State, TryData};
use socketioxide::handler::ConnectHandler;
use socketioxide_core::Value;
use socketioxide_core::parser::Parse;
use socketioxide_parser_common::CommonParser;
use tokio::sync::mpsc;
use engineioxide::Packet as EioPacket;
use socketioxide::SocketIo;
use socketioxide_core::packet::Packet;
mod fixture;
mod utils;
async fn timeout_rcv<T: std::fmt::Debug>(srx: &mut tokio::sync::mpsc::Receiver<T>) -> T {
tokio::time::timeout(Duration::from_millis(10), srx.recv())
.await
.unwrap()
.unwrap()
}
async fn timeout_rcv_err<T: std::fmt::Debug>(srx: &mut tokio::sync::mpsc::Receiver<T>) {
tokio::time::timeout(Duration::from_millis(10), srx.recv())
.await
.unwrap_err();
}
fn create_msg(ns: &'static str, event: &str, data: impl serde::Serialize) -> EioPacket {
let val = CommonParser.encode_value(&data, Some(event)).unwrap();
match CommonParser.encode(Packet::event(ns, val)) {
Value::Str(msg, _) => EioPacket::Message(msg),
Value::Bytes(_) => unreachable!(),
}
}
#[tokio::test]
pub async fn state_extractor() {
let state = 1112i32;
let (_, io) = SocketIo::builder().with_state(state).build_svc();
io.ns("/", async |socket: SocketRef, State(state): State<i32>| {
assert_ok!(socket.emit("state", &state));
socket.on(
"test",
async |socket: SocketRef, State(state): State<i32>| {
assert_ok!(socket.emit("state", &state));
},
);
});
let res_packet = create_msg("/", "state", state);
let (stx, mut srx) = io.new_dummy_sock("/", ()).await;
srx.recv().await;
assert_eq!(timeout_rcv(&mut srx).await, res_packet);
assert_ok!(stx.try_send(create_msg("/", "test", "foo")));
assert_eq!(timeout_rcv(&mut srx).await, res_packet);
}
#[tokio::test]
pub async fn data_extractor() {
let (_, io) = SocketIo::new_svc();
let (tx, mut rx) = mpsc::channel::<String>(4);
let tx1 = tx.clone();
io.ns(
"/",
async move |socket: SocketRef, Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
socket.on("test", async move |Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
});
},
);
io.new_dummy_sock("/", ()).await;
assert!(matches!(
rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));
io.new_dummy_sock("/", 1321).await;
assert!(matches!(
rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));
assert_eq!(tx1.capacity(), 4);
let (stx, _rtx) = io.new_dummy_sock("/", "foo").await;
assert_eq!(timeout_rcv(&mut rx).await, "foo");
assert_ok!(stx.try_send(create_msg("/", "test", "oof")));
assert_eq!(timeout_rcv(&mut rx).await, "oof");
assert_ok!(stx.try_send(create_msg("/", "test", json!({ "test": 132 }))));
assert_eq!(tx1.capacity(), 4);
}
#[tokio::test]
pub async fn try_data_extractor() {
let (_, io) = SocketIo::new_svc();
let (tx, mut rx) = mpsc::channel::<Result<String, ParserError>>(4);
io.ns(
"/",
async move |s: SocketRef, TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
s.on("test", async move |TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
});
},
);
io.new_dummy_sock("/", ()).await;
assert_err!(timeout_rcv(&mut rx).await);
io.new_dummy_sock("/", 1321).await;
assert_err!(timeout_rcv(&mut rx).await);
let (stx, _rtx) = io.new_dummy_sock("/", "foo").await;
let res = assert_ok!(timeout_rcv(&mut rx).await);
assert_eq!(res, "foo");
assert_ok!(stx.try_send(create_msg("/", "test", "oof")));
let res = assert_ok!(timeout_rcv(&mut rx).await);
assert_eq!(res, "oof");
assert_ok!(stx.try_send(create_msg("/", "test", json!({ "test": 132 }))));
assert_err!(timeout_rcv(&mut rx).await);
}
#[tokio::test]
pub async fn extension_extractor() {
let (_, io) = SocketIo::new_svc();
async fn on_test(s: SocketRef, Extension(i): Extension<usize>) {
s.emit("from_ev_test", &i).unwrap();
}
async fn ns_root(s: SocketRef, Extension(i): Extension<usize>) {
s.emit("from_ns", &i).unwrap();
s.on("test", on_test);
}
async fn set_ext(s: SocketRef) -> Result<(), Infallible> {
s.extensions.insert(123usize);
Ok(())
}
io.ns("/", ns_root.with(set_ext));
io.ns("/test", ns_root);
let (tx, mut rx) = io.new_dummy_sock("/", ()).await;
assert!(matches!(timeout_rcv(&mut rx).await, EioPacket::Message(s) if s.starts_with('0')));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"from_ns\",123]".into())
);
assert_ok!(tx.try_send(create_msg("/", "test", ())));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"from_ev_test\",123]".into())
);
let (tx, mut rx) = io.new_dummy_sock("/test", ()).await;
assert!(matches!(timeout_rcv(&mut rx).await, EioPacket::Message(s) if s.starts_with('0')));
timeout_rcv_err(&mut rx).await;
assert_ok!(tx.try_send(create_msg("/test", "test", ())));
timeout_rcv_err(&mut rx).await;
}
#[tokio::test]
pub async fn maybe_extension_extractor() {
let (_, io) = SocketIo::new_svc();
async fn on_test(s: SocketRef, MaybeExtension(i): MaybeExtension<usize>) {
s.emit("from_ev_test", &i).unwrap();
}
async fn ns_root(s: SocketRef, MaybeExtension(i): MaybeExtension<usize>) {
s.emit("from_ns", &i).unwrap();
s.on("test", on_test);
}
async fn set_ext(s: SocketRef) -> Result<(), Infallible> {
s.extensions.insert(123usize);
Ok(())
}
io.ns("/", ns_root.with(set_ext));
io.ns("/test", ns_root);
let (tx, mut rx) = io.new_dummy_sock("/", ()).await;
assert!(matches!(timeout_rcv(&mut rx).await, EioPacket::Message(s) if s.starts_with('0')));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"from_ns\",123]".into())
);
assert_ok!(tx.try_send(create_msg("/", "test", ())));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"from_ev_test\",123]".into())
);
let (tx, mut rx) = io.new_dummy_sock("/test", ()).await;
assert!(matches!(timeout_rcv(&mut rx).await, EioPacket::Message(s) if s.starts_with('0')));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2/test,[\"from_ns\",null]".into())
);
assert_ok!(tx.try_send(create_msg("/test", "test", ())));
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2/test,[\"from_ev_test\",null]".into())
);
}
#[tokio::test]
pub async fn event_extractor() {
let (_, io) = SocketIo::new_svc();
async fn fallback_handler(s: SocketRef, Event(event): Event) {
s.emit("fallback", &event).unwrap();
}
io.ns("/", async |socket: SocketRef| {
socket.on_fallback(fallback_handler);
socket.on("defined", async |s: SocketRef, Event(event): Event| {
s.emit("defined", &event).unwrap();
});
});
let (tx, mut rx) = io.new_dummy_sock("/", ()).await;
assert!(matches!(timeout_rcv(&mut rx).await, EioPacket::Message(s) if s.starts_with('0')));
tx.try_send(create_msg("/", "my_event", ())).unwrap();
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"fallback\",\"my_event\"]".into())
);
tx.try_send(create_msg("/", "foo", ())).unwrap();
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"fallback\",\"foo\"]".into())
);
tx.try_send(create_msg("/", "bar", ())).unwrap();
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"fallback\",\"bar\"]".into())
);
tx.try_send(create_msg("/", "defined", ())).unwrap();
assert_eq!(
timeout_rcv(&mut rx).await,
EioPacket::Message("2[\"defined\",\"defined\"]".into())
);
}