mod support;
use std::time::Duration;
use sqry_daemon::ipc::framing::{MAX_FRAME_BYTES, read_frame, write_frame_json};
use support::ipc::TestServer;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
async fn hello_raw(path: &std::path::Path) -> UnixStream {
let mut stream = UnixStream::connect(path).await.unwrap();
write_frame_json(
&mut stream,
&sqry_daemon::DaemonHello {
client_version: "test/0".into(),
protocol_version: 1,
logical_workspace: None,
},
)
.await
.unwrap();
let _ = read_frame(&mut stream).await.unwrap().unwrap();
stream
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn round_trip_daemon_status_over_real_socket() {
let server = TestServer::new().await;
let mut stream = hello_raw(&server.path).await;
let req = serde_json::json!({
"jsonrpc":"2.0","id":1,"method":"daemon/status",
});
let body = req.to_string();
let len = (body.len() as u32).to_le_bytes();
stream.write_all(&len).await.unwrap();
stream.write_all(body.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
let resp_bytes = read_frame(&mut stream).await.unwrap().unwrap();
let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap();
assert_eq!(resp["id"], serde_json::json!(1));
assert!(resp["result"].is_object());
drop(stream);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn oversize_frame_claim_closes_connection() {
let server = TestServer::new().await;
let mut stream = hello_raw(&server.path).await;
let bad_len = (MAX_FRAME_BYTES as u32 + 1).to_le_bytes();
stream.write_all(&bad_len).await.unwrap();
stream.flush().await.unwrap();
let result = tokio::time::timeout(Duration::from_secs(2), read_frame(&mut stream)).await;
match result {
Ok(Ok(None)) => {}
Ok(Err(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {}
other => panic!("expected EOF or UnexpectedEof, got {other:?}"),
}
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn truncated_prefix_closes_connection() {
let server = TestServer::new().await;
let mut stream = hello_raw(&server.path).await;
stream.write_all(&[0x10, 0x00]).await.unwrap();
stream.flush().await.unwrap();
drop(stream);
tokio::time::sleep(Duration::from_millis(100)).await;
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn truncated_body_closes_connection() {
let server = TestServer::new().await;
let mut stream = hello_raw(&server.path).await;
let len = 16u32.to_le_bytes();
stream.write_all(&len).await.unwrap();
stream.write_all(b"short").await.unwrap();
stream.flush().await.unwrap();
drop(stream);
tokio::time::sleep(Duration::from_millis(100)).await;
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn clean_eof_at_frame_boundary_shuts_connection_gracefully() {
let server = TestServer::new().await;
let stream = hello_raw(&server.path).await;
drop(stream);
tokio::time::sleep(Duration::from_millis(50)).await;
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mid_sized_frame_round_trips() {
let server = TestServer::new().await;
let mut stream = hello_raw(&server.path).await;
let pad = "x".repeat(1_000_000);
let req = serde_json::json!({
"jsonrpc":"2.0","id":42,"method":"not-a-real-method",
"params": { "pad": pad },
});
let body = serde_json::to_vec(&req).unwrap();
let len = (body.len() as u32).to_le_bytes();
stream.write_all(&len).await.unwrap();
stream.write_all(&body).await.unwrap();
stream.flush().await.unwrap();
let resp_bytes = read_frame(&mut stream).await.unwrap().unwrap();
let resp: serde_json::Value = serde_json::from_slice(&resp_bytes).unwrap();
assert_eq!(resp["id"], serde_json::json!(42));
assert_eq!(resp["error"]["code"], serde_json::json!(-32601));
drop(stream);
server.stop().await;
}