use tokio::io::{AsyncRead, AsyncWrite, DuplexStream};
use tokio_util::codec::Framed;
use crate::LspCodec;
pub type Transport<T> = Framed<T, LspCodec>;
pub fn transport<T>(io: T) -> Transport<T>
where
T: AsyncRead + AsyncWrite,
{
Framed::new(io, LspCodec::new())
}
#[must_use]
pub fn duplex_transport(buffer_size: usize) -> (Transport<DuplexStream>, Transport<DuplexStream>) {
let (a, b) = tokio::io::duplex(buffer_size);
(transport(a), transport(b))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Message, Notification, Request, Response};
use futures::{SinkExt, StreamExt};
use serde_json::json;
#[tokio::test]
async fn duplex_request_response_test() {
let (mut client, mut server) = duplex_transport(4096);
let request = Message::Request(Request::new(
1,
"textDocument/hover",
Some(json!({
"textDocument": {"uri": "file:///test.rs"},
"position": {"line": 10, "character": 5}
})),
));
client.send(request).await.expect("send request");
let received = server.next().await.expect("receive").expect("decode");
assert!(received.is_request());
if let Message::Request(req) = &received {
assert_eq!(req.method, "textDocument/hover");
assert_eq!(req.id, 1.into());
}
let response = Message::Response(Response::ok(
1,
json!({
"contents": "fn main()"
}),
));
server.send(response).await.expect("send response");
let received = client.next().await.expect("receive").expect("decode");
assert!(received.is_response());
if let Message::Response(resp) = &received {
assert_eq!(resp.id, Some(1.into()));
assert!(resp.result().is_some());
}
}
#[tokio::test]
async fn duplex_notification_test() {
let (mut client, mut server) = duplex_transport(4096);
let notification = Message::Notification(Notification::new(
"textDocument/didOpen",
Some(json!({
"textDocument": {
"uri": "file:///test.rs",
"languageId": "rust",
"version": 1,
"text": "fn main() {}"
}
})),
));
client.send(notification).await.expect("send notification");
let received = server.next().await.expect("receive").expect("decode");
assert!(received.is_notification());
if let Message::Notification(notif) = &received {
assert_eq!(notif.method, "textDocument/didOpen");
}
}
#[tokio::test]
async fn duplex_multiple_messages_test() {
let (mut client, mut server) = duplex_transport(4096);
let msg1 = Message::Request(Request::new(1, "first", None));
let msg2 = Message::Request(Request::new(2, "second", None));
let msg3 = Message::Request(Request::new(3, "third", None));
client.send(msg1).await.expect("send 1");
client.send(msg2).await.expect("send 2");
client.send(msg3).await.expect("send 3");
let recv1 = server.next().await.expect("receive 1").expect("decode 1");
let recv2 = server.next().await.expect("receive 2").expect("decode 2");
let recv3 = server.next().await.expect("receive 3").expect("decode 3");
if let Message::Request(r) = recv1 {
assert_eq!(r.method, "first");
assert_eq!(r.id, 1.into());
} else {
panic!("Expected request");
}
if let Message::Request(r) = recv2 {
assert_eq!(r.method, "second");
assert_eq!(r.id, 2.into());
} else {
panic!("Expected request");
}
if let Message::Request(r) = recv3 {
assert_eq!(r.method, "third");
assert_eq!(r.id, 3.into());
} else {
panic!("Expected request");
}
}
#[tokio::test]
async fn duplex_bidirectional_concurrent_test() {
let (mut client, mut server) = duplex_transport(4096);
let client_msg = Message::Request(Request::new(1, "client/message", None));
let server_msg = Message::Notification(Notification::new("server/notification", None));
let (client_send, server_send) =
tokio::join!(client.send(client_msg), server.send(server_msg));
client_send.expect("client send");
server_send.expect("server send");
let (from_client, from_server) = tokio::join!(server.next(), client.next());
let from_client = from_client.expect("receive from client").expect("decode");
assert!(from_client.is_request());
if let Message::Request(r) = from_client {
assert_eq!(r.method, "client/message");
}
let from_server = from_server.expect("receive from server").expect("decode");
assert!(from_server.is_notification());
if let Message::Notification(n) = from_server {
assert_eq!(n.method, "server/notification");
}
}
#[tokio::test]
async fn large_message_test() {
let (mut client, mut server) = duplex_transport(16384);
let large_data: String = "x".repeat(10_000);
let request = Message::Request(Request::new(
42,
"large/data",
Some(json!({
"content": large_data.clone()
})),
));
client.send(request).await.expect("send large message");
let received = server.next().await.expect("receive").expect("decode");
assert!(received.is_request());
if let Message::Request(req) = received {
assert_eq!(req.method, "large/data");
let params = req.params.expect("has params");
let content = params["content"].as_str().expect("content is string");
assert_eq!(content.len(), 10_000);
assert_eq!(content, large_data);
}
}
#[tokio::test]
async fn response_with_error_test() {
use crate::{ErrorCode, ResponseError};
let (mut client, mut server) = duplex_transport(4096);
let request = Message::Request(Request::new(1, "unknown/method", None));
client.send(request).await.expect("send request");
let _received = server.next().await.expect("receive").expect("decode");
let error = ResponseError::new(ErrorCode::MethodNotFound, "Method not found");
let response = Message::Response(Response::err(1, error));
server.send(response).await.expect("send error response");
let received = client.next().await.expect("receive").expect("decode");
assert!(received.is_response());
if let Message::Response(resp) = received {
assert_eq!(resp.id, Some(1.into()));
assert!(resp.error().is_some());
let err = resp.into_error().unwrap();
assert_eq!(err.code, -32601); assert_eq!(err.message, "Method not found");
}
}
}