mod support;
use serde_json::json;
use sqry_daemon::{
DaemonConfig,
ipc::protocol::{JsonRpcPayload, ShimProtocol, ShimRegister, ShimRegisterAck},
};
use support::ipc::{TestIpcClient, TestServer};
async fn send_shim_register_and_read_ack(
server: &TestServer,
protocol: ShimProtocol,
pid: u32,
) -> ShimRegisterAck {
let mut client = TestIpcClient::connect(&server.path).await;
let req = ShimRegister { protocol, pid };
client.send_raw(&req).await;
client.read_typed::<ShimRegisterAck>().await
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hello_first_frame_routed_to_jsonrpc() {
let server = TestServer::new().await;
let mut client = TestIpcClient::connect(&server.path).await;
let hello_resp = client.hello(1).await;
assert!(hello_resp.compatible, "hello must be compatible");
let resp = client.request("daemon/status", json!({})).await;
match &resp.payload {
JsonRpcPayload::Success { .. } => {}
JsonRpcPayload::Error { error } => {
panic!("expected success from daemon/status, got error: {error:?}");
}
}
drop(client);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shim_register_first_frame_routed_to_shim_path() {
let server = TestServer::new().await;
let ack = send_shim_register_and_read_ack(&server, ShimProtocol::Mcp, 12345).await;
assert!(ack.accepted, "shim registration should be accepted");
assert!(
!ack.daemon_version.is_empty(),
"daemon_version must be populated"
);
assert_eq!(
ack.envelope_version,
sqry_daemon::ENVELOPE_VERSION,
"envelope_version mismatch"
);
assert!(ack.reason.is_none(), "accepted ack must have no reason");
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn malformed_first_frame_rejected_with_invalid_request() {
let server = TestServer::new().await;
let mut client = TestIpcClient::connect(&server.path).await;
let bogus = json!({"completely": "unknown", "shape": true});
client.send_raw(&bogus).await;
let resp = client.read_response().await;
match &resp.payload {
JsonRpcPayload::Error { error } => {
assert_eq!(
error.code, -32600,
"expected -32600 Invalid Request, got: {}",
error.code
);
assert!(resp.id.is_none(), "invalid-request id must be null");
}
JsonRpcPayload::Success { .. } => panic!("expected error, got success"),
}
drop(client);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn first_frame_mixed_keys_rejected() {
let server = TestServer::new().await;
let mut client = TestIpcClient::connect(&server.path).await;
let mixed = json!({
"client_version": "test/0.0.1",
"protocol_version": 1,
"protocol": "lsp",
"pid": 42,
});
client.send_raw(&mixed).await;
let ack = client.read_typed::<ShimRegisterAck>().await;
assert!(!ack.accepted, "mixed-keys frame must be rejected");
assert!(ack.reason.is_some(), "rejected ack must carry a reason");
drop(client);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unknown_protocol_variant_rejected_on_shim_register() {
let server = TestServer::new().await;
let mut client = TestIpcClient::connect(&server.path).await;
let bad = json!({"protocol": "grpc", "pid": 99});
client.send_raw(&bad).await;
let ack = client.read_typed::<ShimRegisterAck>().await;
assert!(!ack.accepted, "unknown protocol variant must be rejected");
assert!(ack.reason.is_some(), "rejected ack must carry a reason");
drop(client);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_hello_and_shim_connections_isolated() {
let server = TestServer::new().await;
let hello_tasks: Vec<_> = (0u32..3)
.map(|_| {
let path = server.path.clone();
tokio::spawn(async move {
let mut client = TestIpcClient::connect(&path).await;
let hello = client.hello(1).await;
assert!(hello.compatible);
let resp = client.request("daemon/status", json!({})).await;
match resp.payload {
JsonRpcPayload::Success { .. } => {}
JsonRpcPayload::Error { error } => {
panic!("hello path got error: {error:?}");
}
}
})
})
.collect();
let shim_tasks: Vec<_> = (0u32..3)
.map(|i| {
let path = server.path.clone();
tokio::spawn(async move {
let mut client = TestIpcClient::connect(&path).await;
let req = ShimRegister {
protocol: ShimProtocol::Mcp,
pid: 2000 + i,
};
client.send_raw(&req).await;
let ack = client.read_typed::<ShimRegisterAck>().await;
assert!(ack.accepted, "shim conn {i} must be accepted");
})
})
.collect();
for task in hello_tasks {
task.await.expect("hello task panicked");
}
for task in shim_tasks {
task.await.expect("shim task panicked");
}
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shim_register_admission_denied_when_cap_reached() {
let server = TestServer::with_config(DaemonConfig {
max_shim_connections: 1,
..DaemonConfig::default()
})
.await;
let mut client1 = TestIpcClient::connect(&server.path).await;
let req1 = ShimRegister {
protocol: ShimProtocol::Lsp,
pid: 100,
};
client1.send_raw(&req1).await;
let ack1 = client1.read_typed::<ShimRegisterAck>().await;
assert!(ack1.accepted, "first shim must be accepted");
let ack2 = send_shim_register_and_read_ack(&server, ShimProtocol::Mcp, 200).await;
assert!(!ack2.accepted, "second shim must be denied at cap 1");
let reason = ack2.reason.expect("rejected ack must have a reason");
assert!(
reason.contains("full") || reason.contains("cap") || reason.contains("registry"),
"rejection reason should mention capacity: {reason}"
);
drop(client1);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let ack3 = send_shim_register_and_read_ack(&server, ShimProtocol::Mcp, 300).await;
assert!(
ack3.accepted,
"after first client disconnects, new shim must be accepted"
);
server.stop().await;
}