#![cfg(unix)]
#![allow(clippy::too_many_lines)]
use std::io::{BufRead, BufReader, Write as _};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn find_binary(name: &str) -> Option<PathBuf> {
let env_key = format!("CARGO_BIN_EXE_{name}");
if let Ok(path) = std::env::var(&env_key) {
let p = PathBuf::from(path);
if p.is_file() {
return Some(p);
}
}
let binary_name = format!("{name}{}", std::env::consts::EXE_SUFFIX);
let exe = std::env::current_exe().ok()?;
let parent = exe.parent()?; let candidate = parent.join(&binary_name);
if candidate.is_file() {
return Some(candidate);
}
let grandparent = parent.parent()?; let candidate = grandparent.join(&binary_name);
if candidate.is_file() {
return Some(candidate);
}
None
}
struct ChildGuard {
child: Child,
_stdin: Option<ChildStdin>,
}
impl ChildGuard {
fn new(child: Child) -> Self {
Self {
child,
_stdin: None,
}
}
fn with_stdin(mut self, stdin: ChildStdin) -> Self {
self._stdin = Some(stdin);
self
}
}
impl Drop for ChildGuard {
fn drop(&mut self) {
self._stdin = None;
let _ = self.child.kill();
let _ = self.child.wait();
}
}
struct DetachedDaemonGuard {
pid: Option<u32>,
}
impl DetachedDaemonGuard {
fn unarmed() -> Self {
Self { pid: None }
}
fn arm(&mut self, pid: u32) {
self.pid = Some(pid);
}
fn disarm(&mut self) {
self.pid = None;
}
fn kill_and_wait(&mut self, timeout: Duration) {
let pid = match self.pid.take() {
Some(p) => p,
None => return,
};
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGTERM);
}
let deadline = Instant::now() + timeout;
loop {
let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
if rc != 0 {
break;
}
if Instant::now() >= deadline {
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGKILL);
}
std::thread::sleep(Duration::from_millis(100));
break;
}
std::thread::sleep(Duration::from_millis(50));
}
}
}
impl Drop for DetachedDaemonGuard {
fn drop(&mut self) {
if let Some(pid) = self.pid.take() {
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGTERM);
}
let deadline = Instant::now() + Duration::from_millis(500);
loop {
let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
if rc != 0 {
break; }
if Instant::now() >= deadline {
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGKILL);
}
break;
}
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
fn wait_for_socket_sync(socket_path: &Path, timeout: Duration) -> bool {
let deadline = Instant::now() + timeout;
loop {
if std::os::unix::net::UnixStream::connect(socket_path).is_ok() {
return true;
}
if Instant::now() >= deadline {
return false;
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn poll_pidfile_for_valid_pid(pidfile_path: &Path, timeout: Duration) -> Option<u32> {
let deadline = Instant::now() + timeout;
loop {
if let Ok(s) = std::fs::read_to_string(pidfile_path)
&& let Ok(pid) = s.trim().parse::<u32>()
&& pid > 1
{
return Some(pid);
}
if Instant::now() >= deadline {
return None;
}
std::thread::sleep(Duration::from_millis(50));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_start_from_mcp_shim() {
let sqry_mcp_bin = match find_binary("sqry-mcp") {
Some(p) => p,
None => {
eprintln!(
"SKIP auto_start_from_mcp_shim: sqry-mcp binary not found \
(run `cargo build --workspace` first)"
);
return;
}
};
let sqryd_bin = match find_binary("sqryd") {
Some(p) => p,
None => {
eprintln!(
"SKIP auto_start_from_mcp_shim: sqryd binary not found \
(run `cargo build --workspace` first)"
);
return;
}
};
let tmp = tempfile::TempDir::new().expect("create tempdir for auto-start e2e test");
let sqry_dir = tmp.path().join("sqry");
std::fs::create_dir_all(&sqry_dir).expect("create sqry subdirectory inside tempdir");
let socket_path = sqry_dir.join("sqryd.sock");
let pidfile_path = sqry_dir.join("sqryd.pid");
let config_path = tmp.path().join("daemon.toml");
std::fs::write(&config_path, b"").expect("write empty daemon config");
assert!(
!socket_path.exists(),
"test precondition: no socket at {} before test starts",
socket_path.display()
);
let mut cmd = Command::new(&sqry_mcp_bin);
cmd.args(["--daemon", "--daemon-socket"])
.arg(&socket_path)
.env("XDG_RUNTIME_DIR", tmp.path())
.env("SQRYD_PATH", &sqryd_bin)
.env("SQRY_DAEMON_CONFIG", &config_path)
.env("SQRY_DAEMON_SOCKET", &socket_path)
.env_remove("SQRY_DAEMON_NO_AUTO_START")
.env("SQRY_DAEMON_LOG_LEVEL", "warn")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit());
let mut child = cmd
.spawn()
.unwrap_or_else(|e| panic!("failed to spawn sqry-mcp --daemon: {e}"));
let child_stdin = child.stdin.take().expect("sqry-mcp stdin is piped");
let child_stdout = child.stdout.take().expect("sqry-mcp stdout is piped");
let mut grandchild_guard = DetachedDaemonGuard::unarmed();
let mut child_guard = ChildGuard::new(child).with_stdin(child_stdin);
let initialize_request = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "sqry-daemon-e2e-test",
"version": "0.0.1"
}
}
});
let mut req_bytes =
serde_json::to_vec(&initialize_request).expect("serialize initialize request");
req_bytes.push(b'\n');
{
let stdin = child_guard._stdin.take().expect("stdin is present");
let req_clone = req_bytes.clone();
let mut stdin_moved = stdin;
let written = tokio::task::spawn_blocking(move || {
stdin_moved.write_all(&req_clone).map(|_| stdin_moved)
})
.await
.expect("spawn_blocking did not panic");
match written {
Ok(stdin_back) => {
child_guard._stdin = Some(stdin_back);
}
Err(e) => {
panic!(
"failed to write MCP initialize request to sqry-mcp stdin: {e}; \
socket={}",
socket_path.display()
);
}
}
}
let response_timeout = Duration::from_secs(30);
let (tx, rx) = std::sync::mpsc::channel::<String>();
let child_stdout_for_thread = child_stdout;
std::thread::spawn(move || {
let reader = BufReader::new(child_stdout_for_thread);
for line in reader.lines() {
match line {
Ok(l) if !l.trim().is_empty() => {
if tx.send(l).is_err() {
break; }
}
Ok(_) => {} Err(e) => {
eprintln!("sqry-mcp stdout read error: {e}");
break;
}
}
}
});
let deadline = Instant::now() + response_timeout;
let mut initialize_response: Option<serde_json::Value> = None;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match rx.recv_timeout(remaining.min(Duration::from_millis(500))) {
Ok(line) => {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
&& v.get("id") == Some(&serde_json::json!(1))
{
initialize_response = Some(v);
break;
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
let response = initialize_response.unwrap_or_else(|| {
panic!(
"auto_start_from_mcp_shim: did not receive MCP initialize response \
with id=1 within {}s;\n\
sqry-mcp binary: {}\n\
sqryd binary: {}\n\
socket: {}",
response_timeout.as_secs(),
sqry_mcp_bin.display(),
sqryd_bin.display(),
socket_path.display(),
)
});
assert!(
response.get("error").is_none(),
"MCP initialize returned an error: {:?}",
response.get("error")
);
let result = response
.get("result")
.unwrap_or_else(|| panic!("MCP initialize response has no `result` field: {response:?}"));
let capabilities = result
.get("capabilities")
.unwrap_or_else(|| panic!("MCP initialize result has no `capabilities` field: {result:?}"));
assert!(
capabilities.is_object(),
"MCP initialize result.capabilities must be a JSON object; got: {capabilities:?}"
);
let initialized_notification = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
});
let mut notif_bytes =
serde_json::to_vec(&initialized_notification).expect("serialize notifications/initialized");
notif_bytes.push(b'\n');
{
let stdin = child_guard
._stdin
.take()
.expect("stdin is present for initialized notif");
let bytes = notif_bytes;
let mut stdin_moved = stdin;
let written =
tokio::task::spawn_blocking(move || stdin_moved.write_all(&bytes).map(|_| stdin_moved))
.await
.expect("spawn_blocking for notifications/initialized did not panic");
match written {
Ok(stdin_back) => {
child_guard._stdin = Some(stdin_back);
}
Err(e) => {
panic!("failed to write notifications/initialized: {e}");
}
}
}
assert!(
socket_path.exists(),
"daemon socket must exist after MCP initialize: {}",
socket_path.display()
);
assert!(
wait_for_socket_sync(&socket_path, Duration::from_secs(2)),
"daemon socket must be connectable after MCP initialize: {}",
socket_path.display()
);
let grandchild_pid = poll_pidfile_for_valid_pid(&pidfile_path, Duration::from_secs(5));
assert!(
grandchild_pid.is_some(),
"daemon pidfile must contain a valid PID within 5s after successful auto-start: {}",
pidfile_path.display()
);
if let Some(pid) = grandchild_pid {
grandchild_guard.arm(pid);
}
grandchild_guard.kill_and_wait(Duration::from_secs(7));
grandchild_guard.disarm();
drop(child_guard);
}
mod support;
use std::pin::Pin;
use sqry_core::project::{ProjectRootMode, canonicalize_path};
use sqry_daemon::{DaemonConfig, JSONRPC_WORKSPACE_EVICTED, RealWorkspaceBuilder, WorkspaceKey};
use sqry_daemon_client::{AsyncReadWrite, DaemonClient, ShimProtocol, connect_shim_with_timeouts};
use support::init_git_repo;
use support::ipc::{TestIpcClient, TestServer, expect_error, expect_success};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn file_change_triggers_rebuild() {
let workspace_tmp =
tempfile::TempDir::new().expect("create workspace tempdir for file-change e2e test");
let workspace_root = workspace_tmp.path();
std::fs::create_dir_all(workspace_root.join("src"))
.expect("create src/ directory in workspace tempdir");
std::fs::write(
workspace_root.join("src/lib.rs"),
b"pub fn original_function() {}\n",
)
.expect("write initial src/lib.rs");
init_git_repo(workspace_root);
let canon_root = canonicalize_path(workspace_root).expect("canonicalize workspace root");
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
let builder = Arc::new(RealWorkspaceBuilder::new(Arc::clone(&plugins)))
as Arc<dyn sqry_daemon::WorkspaceBuilder>;
let server = TestServer::with_builder_and_config(
builder,
DaemonConfig {
debounce_ms: 200,
..DaemonConfig::default()
},
)
.await;
let mut mgmt = TestIpcClient::connect(&server.path).await;
let hello = mgmt.hello(1).await;
assert!(
hello.compatible,
"management hello must be compatible: {hello:?}"
);
let load_resp = mgmt
.request(
"daemon/load",
serde_json::json!({ "index_root": canon_root.to_string_lossy() }),
)
.await;
expect_success(&load_resp);
let key = WorkspaceKey::new(canon_root.clone(), ProjectRootMode::GitRoot, 0);
let ws = server.manager.lookup(&key).unwrap_or_else(|| {
panic!(
"workspace must be registered after daemon/load; \
key.source_root={}",
key.source_root.display()
)
});
server
.dispatcher
.ensure_watching(&key, ws, canon_root.clone())
.await
.expect("ensure_watching must succeed on a freshly-loaded workspace");
{
let search_resp = mgmt
.request(
"semantic_search",
serde_json::json!({
"query": "name:original_function",
"path": canon_root.to_string_lossy(),
"max_results": 10,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let result = expect_success(&search_resp);
assert!(
result["result"]["data"]["results"]
.as_array()
.is_some_and(|items| {
items
.iter()
.any(|hit| hit["name"].as_str() == Some("original_function"))
}),
"original_function must be found before file change; result={result}"
);
}
std::fs::write(
workspace_root.join("src/lib.rs"),
b"pub fn renamed_function() {}\n",
)
.expect("overwrite src/lib.rs with renamed_function");
let poll_timeout = Duration::from_secs(20);
let poll_interval = Duration::from_millis(500);
let deadline = Instant::now() + poll_timeout;
let mut renamed_found = false;
while Instant::now() < deadline {
let search_resp = mgmt
.request(
"semantic_search",
serde_json::json!({
"query": "name:renamed_function",
"path": canon_root.to_string_lossy(),
"max_results": 10,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
if let sqry_daemon::ipc::protocol::JsonRpcPayload::Success { result } = &search_resp.payload
{
if result["result"]["data"]["results"]
.as_array()
.is_some_and(|items| {
items
.iter()
.any(|hit| hit["name"].as_str() == Some("renamed_function"))
})
{
renamed_found = true;
break;
}
}
tokio::time::sleep(poll_interval).await;
}
assert!(
renamed_found,
"renamed_function must appear in semantic_search within {}s after file change; \
workspace={}",
poll_timeout.as_secs(),
canon_root.display()
);
{
let search_resp = mgmt
.request(
"semantic_search",
serde_json::json!({
"query": "name:original_function",
"path": canon_root.to_string_lossy(),
"max_results": 10,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let result = expect_success(&search_resp);
assert!(
!result["result"]["data"]["results"]
.as_array()
.is_some_and(|items| {
items
.iter()
.any(|hit| hit["name"].as_str() == Some("original_function"))
}),
"original_function must NOT be found after file rename; result={result}"
);
}
drop(mgmt);
server.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn lru_eviction_under_memory_pressure() {
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
let builder = Arc::new(RealWorkspaceBuilder::new(Arc::clone(&plugins)))
as Arc<dyn sqry_daemon::WorkspaceBuilder>;
let server = TestServer::with_builder_and_config(
builder,
DaemonConfig {
memory_limit_mb: 3,
..DaemonConfig::default()
},
)
.await;
let tmp_a = tempfile::TempDir::new().expect("create workspace A tempdir");
let root_a = tmp_a.path();
std::fs::create_dir_all(root_a.join("src")).expect("create src/ dir for workspace A");
std::fs::write(root_a.join("src/lib.rs"), b"pub fn func_alpha() {}\n")
.expect("write src/lib.rs for workspace A");
init_git_repo(root_a);
let canon_a = canonicalize_path(root_a).expect("canonicalize workspace A");
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"daemon/load",
serde_json::json!({ "index_root": canon_a.to_string_lossy() }),
)
.await;
expect_success(&resp);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"semantic_search",
serde_json::json!({
"query": "name:func_alpha",
"path": canon_a.to_string_lossy(),
"max_results": 5,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let result = expect_success(&resp);
let symbols_str = result.to_string();
assert!(
symbols_str.contains("func_alpha"),
"semantic_search for 'func_alpha' in workspace A must find the symbol \
before eviction; result={symbols_str:.500}"
);
}
let tmp_b = tempfile::TempDir::new().expect("create workspace B tempdir");
let root_b = tmp_b.path();
std::fs::create_dir_all(root_b.join("src")).expect("create src/ dir for workspace B");
std::fs::write(root_b.join("src/lib.rs"), b"pub fn func_beta() {}\n")
.expect("write src/lib.rs for workspace B");
init_git_repo(root_b);
let canon_b = canonicalize_path(root_b).expect("canonicalize workspace B");
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"daemon/load",
serde_json::json!({ "index_root": canon_b.to_string_lossy() }),
)
.await;
expect_success(&resp);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client.request("daemon/status", serde_json::json!({})).await;
let result = expect_success(&resp);
let workspaces = result["result"]["workspaces"]
.as_array()
.unwrap_or_else(|| {
panic!("daemon/status result.workspaces must be an array; result={result}")
});
let b_path_str = canon_b.to_string_lossy().to_string();
let b_entry = workspaces
.iter()
.find(|ws| ws["index_root"].as_str() == Some(&b_path_str))
.unwrap_or_else(|| {
panic!(
"daemon/status must show workspace B loaded after LRU eviction of A; \
workspaces={workspaces:?}"
)
});
assert_eq!(
b_entry["state"].as_str(),
Some("Loaded"),
"workspace B must remain Loaded; entry={b_entry}",
);
let a_path_str = canon_a.to_string_lossy().to_string();
let a_entry = workspaces
.iter()
.find(|ws| ws["index_root"].as_str() == Some(&a_path_str))
.unwrap_or_else(|| {
panic!(
"daemon/status must surface the LRU-evicted workspace A as an Evicted \
tombstone (STEP_6 iter-2 contract); workspaces={workspaces:?}"
)
});
assert_eq!(
a_entry["state"].as_str(),
Some("Evicted"),
"workspace A must surface as Evicted, not absent; entry={a_entry}",
);
assert_eq!(
a_entry["current_bytes"].as_u64(),
Some(0),
"evicted tombstone must report zero resident bytes; entry={a_entry}",
);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"semantic_search",
serde_json::json!({
"query": "name:func_beta",
"path": canon_b.to_string_lossy(),
"max_results": 5,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let result = expect_success(&resp);
let symbols_str = result.to_string();
assert!(
symbols_str.contains("func_beta"),
"semantic_search for 'func_beta' in workspace B must find the symbol; \
result={symbols_str:.500}"
);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"semantic_search",
serde_json::json!({
"query": "name:func_alpha",
"path": canon_a.to_string_lossy(),
"max_results": 5,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let err = expect_error(&resp);
assert_eq!(
err.code, JSONRPC_WORKSPACE_EVICTED,
"semantic_search against evicted workspace A must return \
JSONRPC_WORKSPACE_EVICTED ({}); got code={}: message={}",
JSONRPC_WORKSPACE_EVICTED, err.code, err.message,
);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"daemon/load",
serde_json::json!({ "index_root": canon_a.to_string_lossy() }),
)
.await;
expect_success(&resp);
}
{
let mut client = TestIpcClient::connect(&server.path).await;
client.hello(1).await;
let resp = client
.request(
"semantic_search",
serde_json::json!({
"query": "name:func_alpha",
"path": canon_a.to_string_lossy(),
"max_results": 5,
"context_lines": 0,
"include_classpath": false,
}),
)
.await;
let result = expect_success(&resp);
let symbols_str = result.to_string();
assert!(
symbols_str.contains("func_alpha"),
"semantic_search for 'func_alpha' must find the symbol after \
workspace A is reloaded; result={symbols_str:.500}"
);
}
server.stop().await;
}
async fn write_lsp_frame_bytes<W: AsyncWriteExt + Unpin>(writer: &mut W, body: &str) {
let frame = format!("Content-Length: {}\r\n\r\n{}", body.len(), body);
writer.write_all(frame.as_bytes()).await.unwrap();
}
async fn read_lsp_bytes<R: AsyncReadExt + Unpin>(reader: &mut R) -> Vec<u8> {
let mut buf = vec![0u8; 65536];
let n = tokio::time::timeout(Duration::from_secs(10), reader.read(&mut buf))
.await
.expect("LSP read timeout within 10s")
.expect("LSP read I/O error");
assert!(n > 0, "expected LSP response bytes, got EOF");
buf[..n].to_vec()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_lsp_mcp_cli_against_one_daemon() {
let workspace_tmp =
tempfile::TempDir::new().expect("create workspace tempdir for concurrent e2e test");
let workspace_root = workspace_tmp.path();
std::fs::create_dir_all(workspace_root.join("src")).expect("create src/ directory");
std::fs::write(
workspace_root.join("src/lib.rs"),
b"pub fn concurrent_target() {}\n",
)
.expect("write src/lib.rs");
init_git_repo(workspace_root);
let canon_root = canonicalize_path(workspace_root).expect("canonicalize workspace root");
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
let builder = Arc::new(RealWorkspaceBuilder::new(Arc::clone(&plugins)))
as Arc<dyn sqry_daemon::WorkspaceBuilder>;
let server = TestServer::with_builder_and_config(builder, DaemonConfig::default()).await;
let socket_path = server.path.clone();
let shim_registry = server.shim_registry();
{
let mut mgmt = TestIpcClient::connect(&socket_path).await;
let hello_resp = mgmt.hello(1).await;
assert!(
hello_resp.compatible,
"management hello must be compatible: {hello_resp:?}"
);
let load_resp = mgmt
.request(
"daemon/load",
serde_json::json!({ "index_root": canon_root.to_string_lossy() }),
)
.await;
match &load_resp.payload {
sqry_daemon::ipc::protocol::JsonRpcPayload::Success { result } => {
let _ = result;
}
sqry_daemon::ipc::protocol::JsonRpcPayload::Error { error } => {
panic!(
"daemon/load failed for {}: code={} message={}",
canon_root.display(),
error.code,
error.message
);
}
}
}
let socket_path_lsp = socket_path.clone();
let lsp_task = tokio::spawn(async move {
let conn = connect_shim_with_timeouts(
&socket_path_lsp,
ShimProtocol::Lsp,
std::process::id(),
Duration::from_secs(5),
Duration::from_secs(5),
)
.await
.expect("LSP shim connect+handshake must succeed");
let mut stream = conn.into_stream();
let body = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"processId": null,
"rootUri": null,
"capabilities": {}
}
})
.to_string();
write_lsp_frame_bytes(&mut stream, &body).await;
let response_bytes = read_lsp_bytes(&mut stream).await;
let response_str = String::from_utf8_lossy(&response_bytes);
assert!(
response_str.contains("Content-Length:"),
"LSP response must be Content-Length framed; got: {response_str:.200}"
);
let json_body = response_str
.find("\r\n\r\n")
.map(|idx| &response_str[idx + 4..])
.unwrap_or_else(|| {
panic!(
"LSP response missing CRLFCRLF header/body separator; \
got: {response_str:.300}"
)
});
let response: serde_json::Value = serde_json::from_str(json_body).unwrap_or_else(|e| {
panic!(
"LSP initialize response JSON body failed to parse: {e}; \
body: {json_body:.300}"
)
});
assert!(
response.get("error").is_none(),
"LSP initialize must not return error; got: {response:?}"
);
assert!(
response.get("id") == Some(&serde_json::json!(1)),
"LSP initialize response must echo id=1; got: {response:?}"
);
let result = response.get("result").unwrap_or_else(|| {
panic!("LSP initialize response has no `result`; got: {response:?}")
});
assert!(
result.get("capabilities").is_some(),
"LSP initialize result must have `capabilities`; got: {result:?}"
);
stream
});
let socket_path_mcp = socket_path.clone();
let mcp_task: tokio::task::JoinHandle<Pin<Box<dyn AsyncReadWrite + Send>>> =
tokio::spawn(async move {
let conn = connect_shim_with_timeouts(
&socket_path_mcp,
ShimProtocol::Mcp,
std::process::id(),
Duration::from_secs(5),
Duration::from_secs(5),
)
.await
.expect("MCP shim connect+handshake must succeed");
let mut stream = conn.into_stream();
let initialize = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "sqry-daemon-e2e-concurrent-test",
"version": "0.0.1"
}
}
});
let mut req_bytes = serde_json::to_vec(&initialize).expect("serialize MCP initialize");
req_bytes.push(b'\n');
stream
.write_all(&req_bytes)
.await
.expect("write MCP initialize");
let mut buf = Vec::with_capacity(4096);
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
let mut byte = [0u8; 1];
let n = tokio::time::timeout(
deadline
.saturating_duration_since(tokio::time::Instant::now())
.max(Duration::from_millis(1)),
stream.read(&mut byte),
)
.await
.expect("MCP initialize response read timeout")
.expect("MCP initialize response read I/O error");
assert!(n > 0, "MCP stream closed before initialize response");
buf.push(byte[0]);
if byte[0] == b'\n' {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out reading MCP initialize response"
);
}
let response_str = String::from_utf8_lossy(&buf);
let response: serde_json::Value = serde_json::from_slice(&buf).unwrap_or_else(|e| {
panic!(
"MCP initialize response is not valid JSON: {e}; \
got: {response_str:.200}"
)
});
assert!(
response.get("error").is_none(),
"MCP initialize must not return error; got: {response:?}"
);
let result = response.get("result").unwrap_or_else(|| {
panic!("MCP initialize response has no `result`; got: {response:?}")
});
assert!(
result.get("capabilities").is_some(),
"MCP initialize result must have `capabilities`; got: {result:?}"
);
let initialized_notif = serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized",
"params": {}
});
let mut notif_bytes = serde_json::to_vec(&initialized_notif)
.expect("serialize notifications/initialized");
notif_bytes.push(b'\n');
stream
.write_all(¬if_bytes)
.await
.expect("write notifications/initialized");
stream
});
let socket_path_cli = socket_path.clone();
let cli_task = tokio::spawn(async move {
let mut client = DaemonClient::connect(&socket_path_cli)
.await
.expect("DaemonClient::connect must succeed");
let status = client.status().await.expect("daemon/status must succeed");
let version_str = status
.get("meta")
.and_then(|m| m.get("daemon_version"))
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
panic!(
"daemon/status response must include `meta.daemon_version`; \
got: {status:?}"
)
});
assert!(
!version_str.is_empty(),
"daemon/status `meta.daemon_version` must be a non-empty string; \
got: {status:?}"
);
});
let (lsp_stream, mut mcp_stream, ()) =
tokio::try_join!(lsp_task, mcp_task, cli_task).expect("all concurrent tasks must succeed");
let search_request = serde_json::json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "semantic_search",
"arguments": {
"query": "concurrent_target",
"path": canon_root.to_string_lossy(),
"max_results": 5,
"context_lines": 0,
"include_classpath": false
}
}
});
let mut search_bytes =
serde_json::to_vec(&search_request).expect("serialize semantic_search tools/call");
search_bytes.push(b'\n');
mcp_stream
.write_all(&search_bytes)
.await
.expect("write tools/call semantic_search");
let search_response = 'find_response: {
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
loop {
let mut line_buf = Vec::with_capacity(4096);
loop {
let remaining = deadline
.saturating_duration_since(tokio::time::Instant::now())
.max(Duration::from_millis(1));
let mut byte = [0u8; 1];
let n = tokio::time::timeout(remaining, mcp_stream.read(&mut byte))
.await
.expect("tools/call response read timeout")
.expect("tools/call response read I/O error");
assert!(n > 0, "MCP stream closed before tools/call response");
line_buf.push(byte[0]);
if byte[0] == b'\n' {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out reading tools/call response line"
);
}
if line_buf.iter().all(|&b| b == b'\n' || b == b'\r') {
continue;
}
if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&line_buf)
&& v.get("id") == Some(&serde_json::json!(2))
{
break 'find_response v;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for tools/call id=2 response"
);
}
};
assert!(
search_response.get("error").is_none(),
"tools/call semantic_search must not return JSON-RPC error; got: {search_response:?}"
);
let tool_result = search_response.get("result").unwrap_or_else(|| {
panic!("tools/call response must have `result`; got: {search_response:?}")
});
let content_json_str = tool_result
.get("content")
.and_then(|c| c.get(0))
.and_then(|item| item.get("text"))
.and_then(|t| t.as_str())
.unwrap_or_else(|| {
panic!("tools/call result must have content[0].text; got: {tool_result:?}")
});
let content_val: serde_json::Value =
serde_json::from_str(content_json_str).unwrap_or_else(|e| {
panic!(
"tools/call content[0].text is not valid JSON: {e}; \
got: {content_json_str:.300}"
)
});
let found = content_val["data"]["results"]
.as_array()
.map(|hits| {
hits.iter()
.any(|hit| hit["name"].as_str() == Some("concurrent_target"))
})
.unwrap_or(false);
assert!(
found,
"semantic_search for 'concurrent_target' must return a SearchHit with \
name == 'concurrent_target'; \
data.results: {:?}",
content_val["data"]["results"]
);
drop(lsp_stream);
drop(mcp_stream);
let drain_deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
if shim_registry.is_empty() {
break;
}
assert!(
tokio::time::Instant::now() < drain_deadline,
"shim_registry must drain to 0 within 5s after all clients disconnect; \
len={}",
shim_registry.len()
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(
shim_registry.is_empty(),
"shim_registry must be empty after all client connections drop"
);
server.stop().await;
}