use expect_test::expect;
use futures::{AsyncRead, AsyncWrite};
use sacp::{
ConnectionTo, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, Responder, SentRequest,
role::UntypedRole,
};
use serde::{Deserialize, Serialize};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
async fn recv<T: JsonRpcResponse + Send>(response: SentRequest<T>) -> Result<T, sacp::Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
response.on_receiving_result(async move |result| {
tx.send(result).map_err(|_| sacp::Error::internal_error())
})?;
rx.await.map_err(|_| sacp::Error::internal_error())?
}
fn setup_test_streams() -> (
impl AsyncRead,
impl AsyncWrite,
impl AsyncRead,
impl AsyncWrite,
) {
let (client_writer, server_reader) = tokio::io::duplex(1024);
let (server_writer, client_reader) = tokio::io::duplex(1024);
let server_reader = server_reader.compat();
let server_writer = server_writer.compat_write();
let client_reader = client_reader.compat();
let client_writer = client_writer.compat_write();
(server_reader, server_writer, client_reader, client_writer)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SimpleRequest {
message: String,
}
impl JsonRpcMessage for SimpleRequest {
fn matches_method(method: &str) -> bool {
method == "simple_method"
}
fn method(&self) -> &str {
"simple_method"
}
fn to_untyped_message(&self) -> Result<sacp::UntypedMessage, sacp::Error> {
sacp::UntypedMessage::new(self.method(), self)
}
fn parse_message(method: &str, params: &impl serde::Serialize) -> Result<Self, sacp::Error> {
if !Self::matches_method(method) {
return Err(sacp::Error::method_not_found());
}
sacp::util::json_cast(params)
}
}
impl JsonRpcRequest for SimpleRequest {
type Response = SimpleResponse;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SimpleResponse {
result: String,
}
impl JsonRpcResponse for SimpleResponse {
fn into_json(self, _method: &str) -> Result<serde_json::Value, sacp::Error> {
serde_json::to_value(self).map_err(sacp::Error::into_internal_error)
}
fn from_value(_method: &str, value: serde_json::Value) -> Result<Self, sacp::Error> {
sacp::util::json_cast(&value)
}
}
#[tokio::test(flavor = "current_thread")]
async fn test_invalid_json() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::task::LocalSet;
let local = LocalSet::new();
local
.run_until(async {
let (mut client_writer, server_reader) = tokio::io::duplex(1024);
let (server_writer, mut client_reader) = tokio::io::duplex(1024);
let server_reader = server_reader.compat();
let server_writer = server_writer.compat_write();
let server_transport = sacp::ByteStreams::new(server_writer, server_reader);
let server = UntypedRole.builder();
tokio::task::spawn_local(async move {
let _ = server.connect_to(server_transport).await;
});
let invalid_json = b"{\"method\": \"test\", \"id\": 1, INVALID}\n";
client_writer.write_all(invalid_json).await.unwrap();
client_writer.flush().await.unwrap();
let mut buffer = vec![0u8; 1024];
let n = client_reader.read(&mut buffer).await.unwrap();
let response_str = String::from_utf8_lossy(&buffer[..n]);
let response: serde_json::Value =
serde_json::from_str(response_str.trim()).expect("Response should be valid JSON");
expect![[r#"
{
"error": {
"code": -32700,
"data": {
"line": "{\"method\": \"test\", \"id\": 1, INVALID}"
},
"message": "Parse error"
},
"jsonrpc": "2.0"
}"#]]
.assert_eq(&serde_json::to_string_pretty(&response).unwrap());
})
.await;
}
#[tokio::test]
#[ignore = "hangs indefinitely - see https://github.com/symposium-dev/symposium-acp/issues/64"]
async fn test_incomplete_line() {
use futures::io::Cursor;
let incomplete_json = b"{\"method\": \"test\", \"id\": 1";
let input = Cursor::new(incomplete_json.to_vec());
let output = Cursor::new(Vec::new());
let transport = sacp::ByteStreams::new(output, input);
let connection = UntypedRole.builder();
let result = connection.connect_to(transport).await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test(flavor = "current_thread")]
async fn test_unknown_method() {
use tokio::task::LocalSet;
let local = LocalSet::new();
local
.run_until(async {
let (server_reader, server_writer, client_reader, client_writer) = setup_test_streams();
let server_transport = sacp::ByteStreams::new(server_writer, server_reader);
let server = UntypedRole.builder();
let client_transport = sacp::ByteStreams::new(client_writer, client_reader);
let client = UntypedRole.builder();
tokio::task::spawn_local(async move {
server.connect_to(server_transport).await.ok();
});
let result = client
.connect_with(client_transport, async |cx| -> Result<(), sacp::Error> {
let request = SimpleRequest {
message: "test".to_string(),
};
let result: Result<SimpleResponse, _> = recv(cx.send_request(request)).await;
assert!(result.is_err());
if let Err(err) = result {
assert!(matches!(err.code, sacp::ErrorCode::MethodNotFound));
}
Ok(())
})
.await;
assert!(result.is_ok(), "Test failed: {:?}", result);
})
.await;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ErrorRequest {
value: String,
}
impl JsonRpcMessage for ErrorRequest {
fn matches_method(method: &str) -> bool {
method == "error_method"
}
fn method(&self) -> &str {
"error_method"
}
fn to_untyped_message(&self) -> Result<sacp::UntypedMessage, sacp::Error> {
sacp::UntypedMessage::new(self.method(), self)
}
fn parse_message(method: &str, params: &impl serde::Serialize) -> Result<Self, sacp::Error> {
if !Self::matches_method(method) {
return Err(sacp::Error::method_not_found());
}
sacp::util::json_cast(params)
}
}
impl JsonRpcRequest for ErrorRequest {
type Response = SimpleResponse;
}
#[tokio::test(flavor = "current_thread")]
async fn test_handler_returns_error() {
use tokio::task::LocalSet;
let local = LocalSet::new();
local
.run_until(async {
let (server_reader, server_writer, client_reader, client_writer) = setup_test_streams();
let server_transport = sacp::ByteStreams::new(server_writer, server_reader);
let server = UntypedRole.builder().on_receive_request(
async |_request: ErrorRequest,
responder: Responder<SimpleResponse>,
_connection: ConnectionTo<UntypedRole>| {
responder.respond_with_error(sacp::Error::internal_error())
},
sacp::on_receive_request!(),
);
let client_transport = sacp::ByteStreams::new(client_writer, client_reader);
let client = UntypedRole.builder();
tokio::task::spawn_local(async move {
server.connect_to(server_transport).await.ok();
});
let result = client
.connect_with(client_transport, async |cx| -> Result<(), sacp::Error> {
let request = ErrorRequest {
value: "trigger error".to_string(),
};
let result: Result<SimpleResponse, _> = recv(cx.send_request(request)).await;
assert!(result.is_err());
if let Err(err) = result {
assert!(matches!(err.code, sacp::ErrorCode::InternalError));
}
Ok(())
})
.await;
assert!(result.is_ok(), "Test failed: {:?}", result);
})
.await;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EmptyRequest;
impl JsonRpcMessage for EmptyRequest {
fn matches_method(method: &str) -> bool {
method == "strict_method"
}
fn method(&self) -> &str {
"strict_method"
}
fn to_untyped_message(&self) -> Result<sacp::UntypedMessage, sacp::Error> {
sacp::UntypedMessage::new(self.method(), self)
}
fn parse_message(method: &str, _params: &impl serde::Serialize) -> Result<Self, sacp::Error> {
if !Self::matches_method(method) {
return Err(sacp::Error::method_not_found());
}
Ok(EmptyRequest)
}
}
impl JsonRpcRequest for EmptyRequest {
type Response = SimpleResponse;
}
#[tokio::test(flavor = "current_thread")]
async fn test_missing_required_params() {
use tokio::task::LocalSet;
let local = LocalSet::new();
local
.run_until(async {
let (server_reader, server_writer, client_reader, client_writer) = setup_test_streams();
let server_transport = sacp::ByteStreams::new(server_writer, server_reader);
let server = UntypedRole.builder().on_receive_request(
async |_request: EmptyRequest,
responder: Responder<SimpleResponse>,
_connection: ConnectionTo<UntypedRole>| {
responder.respond_with_error(sacp::Error::invalid_params())
},
sacp::on_receive_request!(),
);
let client_transport = sacp::ByteStreams::new(client_writer, client_reader);
let client = UntypedRole.builder();
tokio::task::spawn_local(async move {
server.connect_to(server_transport).await.ok();
});
let result = client
.connect_with(client_transport, async |cx| -> Result<(), sacp::Error> {
let request = EmptyRequest;
let result: Result<SimpleResponse, _> = recv(cx.send_request(request)).await;
assert!(result.is_err());
if let Err(err) = result {
assert!(matches!(err.code, sacp::ErrorCode::InvalidParams)); }
Ok(())
})
.await;
assert!(result.is_ok(), "Test failed: {:?}", result);
})
.await;
}