#![cfg(not(target_arch = "wasm32"))]
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use pmcp::__test_support::{ServerRequest, ServerRequestDispatcher};
#[tokio::test]
async fn test_single_request_response_roundtrip() {
let (tx, mut rx) = mpsc::channel::<(String, ServerRequest)>(4);
let dispatcher = Arc::new(ServerRequestDispatcher::new_with_channel(tx));
let dispatch_fut = {
let d = dispatcher.clone();
tokio::spawn(async move { d.dispatch(ServerRequest::ListRoots).await })
};
let (correlation_id, request) = tokio::time::timeout(Duration::from_millis(200), rx.recv())
.await
.expect("outbound deadline")
.expect("outbound channel closed");
assert!(matches!(request, ServerRequest::ListRoots));
assert!(
!correlation_id.is_empty(),
"correlation id must be non-empty"
);
let response_payload = serde_json::json!({"roots": []});
dispatcher
.handle_response(&correlation_id, response_payload.clone())
.await
.expect("handle_response must succeed");
let result = dispatch_fut
.await
.expect("task panic")
.expect("dispatch must succeed");
assert_eq!(result, response_payload);
assert_eq!(dispatcher.pending_count().await, 0);
}
#[tokio::test]
async fn test_concurrent_multiplex_out_of_order() {
let (tx, mut rx) = mpsc::channel::<(String, ServerRequest)>(8);
let dispatcher = Arc::new(
ServerRequestDispatcher::new_with_channel(tx).with_timeout(Duration::from_secs(2)),
);
let d1 = dispatcher.clone();
let fut_a = tokio::spawn(async move { d1.dispatch(ServerRequest::ListRoots).await });
let d2 = dispatcher.clone();
let fut_b = tokio::spawn(async move { d2.dispatch(ServerRequest::ListRoots).await });
let d3 = dispatcher.clone();
let fut_c = tokio::spawn(async move { d3.dispatch(ServerRequest::ListRoots).await });
let (id_a, _) = rx.recv().await.expect("a");
let (id_b, _) = rx.recv().await.expect("b");
let (id_c, _) = rx.recv().await.expect("c");
assert_ne!(id_a, id_b);
assert_ne!(id_b, id_c);
assert_ne!(id_a, id_c);
let resp_a = serde_json::json!({"roots": [{"uri":"file:///a"}]});
let resp_b = serde_json::json!({"roots": [{"uri":"file:///b"}]});
let resp_c = serde_json::json!({"roots": [{"uri":"file:///c"}]});
dispatcher
.handle_response(&id_c, resp_c.clone())
.await
.expect("fulfill c");
dispatcher
.handle_response(&id_b, resp_b.clone())
.await
.expect("fulfill b");
dispatcher
.handle_response(&id_a, resp_a.clone())
.await
.expect("fulfill a");
assert_eq!(fut_a.await.unwrap().unwrap(), resp_a);
assert_eq!(fut_b.await.unwrap().unwrap(), resp_b);
assert_eq!(fut_c.await.unwrap().unwrap(), resp_c);
assert_eq!(dispatcher.pending_count().await, 0);
}