#![deny(warnings)]
use futures::{FutureExt, SinkExt, StreamExt};
use warp::ws::Message;
use warp::Filter;
#[tokio::test]
async fn upgrade() {
let _ = pretty_env_logger::try_init();
let route = warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|_| async {}));
let key = "dGhlIHNhbXBsZSBub25jZQ==";
let accept = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=";
let resp = warp::test::request()
.header("connection", "upgrade")
.header("upgrade", "websocket")
.header("sec-websocket-version", "13")
.header("sec-websocket-key", key)
.reply(&route)
.await;
assert_eq!(resp.status(), 101);
assert_eq!(resp.headers()["connection"], "upgrade");
assert_eq!(resp.headers()["upgrade"], "websocket");
assert_eq!(resp.headers()["sec-websocket-accept"], accept);
let resp = warp::test::request()
.header("connection", "keep-alive, Upgrade")
.header("upgrade", "Websocket")
.header("sec-websocket-version", "13")
.header("sec-websocket-key", key)
.reply(&route)
.await;
assert_eq!(resp.status(), 101);
}
#[tokio::test]
async fn fail() {
let _ = pretty_env_logger::try_init();
let route = warp::any().map(warp::reply);
warp::test::ws()
.handshake(route)
.await
.expect_err("handshake non-websocket route should fail");
}
#[tokio::test]
async fn text() {
let _ = pretty_env_logger::try_init();
let mut client = warp::test::ws()
.handshake(ws_echo())
.await
.expect("handshake");
client.send_text("hello warp").await;
let msg = client.recv().await.expect("recv");
assert_eq!(msg.to_str(), Ok("hello warp"));
}
#[tokio::test]
async fn binary() {
let _ = pretty_env_logger::try_init();
let mut client = warp::test::ws()
.handshake(ws_echo())
.await
.expect("handshake");
client.send(warp::ws::Message::binary(&b"bonk"[..])).await;
let msg = client.recv().await.expect("recv");
assert!(msg.is_binary());
assert_eq!(msg.as_bytes(), &b"bonk"[..]);
}
#[tokio::test]
async fn send_ping() {
let _ = pretty_env_logger::try_init();
let filter = warp::ws().map(|ws: warp::ws::Ws| {
ws.on_upgrade(|mut websocket| {
async move {
websocket.send(Message::ping("srv")).await.unwrap();
let msg = websocket.next().await.expect("item").expect("ok");
assert!(msg.is_pong());
assert_eq!(msg.as_bytes(), &b"srv"[..]);
}
})
});
let mut client = warp::test::ws().handshake(filter).await.expect("handshake");
let msg = client.recv().await.expect("recv");
assert!(msg.is_ping());
assert_eq!(msg.as_bytes(), &b"srv"[..]);
client.recv_closed().await.expect("closed");
}
#[tokio::test]
async fn echo_pings() {
let _ = pretty_env_logger::try_init();
let mut client = warp::test::ws()
.handshake(ws_echo())
.await
.expect("handshake");
client.send(Message::ping("clt")).await;
let msg = client.recv().await.expect("recv");
assert!(msg.is_pong());
assert_eq!(msg.as_bytes(), &b"clt"[..]);
let msg = client.recv().await.expect("recv");
assert!(msg.is_ping());
assert_eq!(msg.as_bytes(), &b"clt"[..]);
let msg = client.recv().await.expect("recv");
assert!(msg.is_pong());
assert_eq!(msg.as_bytes(), &b"clt"[..]);
}
#[tokio::test]
async fn closed() {
let _ = pretty_env_logger::try_init();
let route =
warp::ws().map(|ws: warp::ws::Ws| ws.on_upgrade(|websocket| websocket.close().map(|_| ())));
let mut client = warp::test::ws().handshake(route).await.expect("handshake");
client.recv_closed().await.expect("closed");
}
#[tokio::test]
async fn limit_message_size() {
let _ = pretty_env_logger::try_init();
let echo = warp::ws().map(|ws: warp::ws::Ws| {
ws.max_message_size(1024).on_upgrade(|websocket| {
let (tx, rx) = websocket.split();
rx.forward(tx).map(|result| {
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()).as_str(),
"Space limit exceeded: Message too big: 0 + 1025 > 1024"
);
})
})
});
let mut client = warp::test::ws().handshake(echo).await.expect("handshake");
client.send(warp::ws::Message::binary(vec![0; 1025])).await;
client.send_text("hello warp").await;
assert!(client.recv().await.is_err());
}
#[tokio::test]
async fn limit_frame_size() {
let _ = pretty_env_logger::try_init();
let echo = warp::ws().map(|ws: warp::ws::Ws| {
ws.max_frame_size(1024).on_upgrade(|websocket| {
let (tx, rx) = websocket.split();
rx.forward(tx).map(|result| {
assert!(result.is_err());
assert_eq!(
format!("{}", result.unwrap_err()).as_str(),
"Space limit exceeded: Message length too big: 1025 > 1024"
);
})
})
});
let mut client = warp::test::ws().handshake(echo).await.expect("handshake");
client.send(warp::ws::Message::binary(vec![0; 1025])).await;
client.send_text("hello warp").await;
assert!(client.recv().await.is_err());
}
fn ws_echo() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Copy {
warp::ws().map(|ws: warp::ws::Ws| {
ws.on_upgrade(|websocket| {
let (tx, rx) = websocket.split();
rx.inspect(|i| log::debug!("ws recv: {:?}", i))
.forward(tx)
.map(|_| ())
})
})
}