use futures::SinkExt as _;
use futures::StreamExt as _;
use jmap_base_client::{ws::connect_ws, WsFrame};
use tokio::net::TcpListener;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn split_enables_concurrent_send_and_receive() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("must bind 127.0.0.1:0");
let server_addr = listener.local_addr().expect("must report local addr");
let server_handle = tokio::spawn(async move {
let (stream, _) = listener
.accept()
.await
.expect("server must accept one connection");
let mut ws = tokio_tungstenite::accept_async(stream)
.await
.expect("server must complete WS handshake");
let inbound = ws
.next()
.await
.expect("server must observe a client message")
.expect("server message must be Ok");
let inbound_text = match inbound {
tokio_tungstenite::tungstenite::Message::Text(t) => t.to_string(),
other => panic!("server expected Text frame from client, got {other:?}"),
};
let state_change =
r#"{"@type":"StateChange","changed":{"account1":{"Mail":"s-from-server"}}}"#;
ws.send(tokio_tungstenite::tungstenite::Message::Text(
state_change.to_owned().into(),
))
.await
.expect("server must send StateChange");
ws.close(None).await.expect("server close must succeed");
inbound_text
});
let ws_url = format!("ws://{server_addr}/");
let session = connect_ws(&ws_url, None)
.await
.expect("client must complete WS handshake");
let (mut sender, mut receiver) = session.split();
let recv_task = tokio::spawn(async move {
let first = receiver
.next_frame()
.await
.expect("must receive one frame before close");
let frame = first.expect("first frame must parse");
let after_close = receiver.next_frame().await;
(frame, after_close.is_none())
});
sender
.send_text("client-hello".to_owned())
.await
.expect("client send_text must succeed");
let inbound_text = server_handle.await.expect("server task must not panic");
assert_eq!(
inbound_text, "client-hello",
"server must observe the client's exact text frame"
);
let (frame, observed_close) = recv_task.await.expect("recv task must not panic");
match frame {
WsFrame::StateChange(sc) => {
let acct = sc
.changed
.get("account1")
.expect("server StateChange must carry account1");
assert_eq!(
acct.get("Mail").map(|s| s.as_ref()),
Some("s-from-server"),
"server StateChange must carry the expected mail state"
);
}
other => panic!("expected StateChange frame, got {other:?}"),
}
assert!(
observed_close,
"client must observe None after server close"
);
}