pub mod error_map;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use rmcp::ErrorData as McpError;
use rmcp::model::{
CallToolRequestParams, CallToolResult, Content, Implementation, InitializeResult,
ListToolsResult, PaginatedRequestParams, ProtocolVersion, ServerCapabilities,
};
use rmcp::service::RequestContext;
use rmcp::{RoleServer, ServerHandler};
use serde_json::{Value, json};
use sqry_core::project::ProjectRootMode;
use sqry_core::query::executor::QueryExecutor;
use sqry_mcp::daemon_adapter::WorkspaceContext;
use sqry_mcp::daemon_adapter::dispatch::{dispatch_by_name, dispatch_sqry_ask};
use sqry_mcp::daemon_params::params_to_sqry_ask_args;
use sqry_mcp::execution::build_translator_config_for_path;
use sqry_mcp::tool_args::SqryAskParams;
use sqry_mcp::tools_schema;
use sqry_nl::{Translator, TranslatorConfig};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use crate::error::DaemonError;
use crate::ipc::tool_core::{self, ExecuteVerdict};
use crate::workspace::{LoadedWorkspace, WorkspaceBuilder, WorkspaceKey, WorkspaceManager};
use error_map::{daemon_err_to_mcp, daemon_err_to_mcp_with_tool, try_onnx_runtime_missing_to_mcp};
const INITIAL_WORKING_SET_BYTES: u64 = 2 * 1024 * 1024;
pub struct DaemonMcpHandler {
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
tools: Vec<rmcp::model::Tool>,
enabled_tool_names: HashSet<String>,
}
impl DaemonMcpHandler {
#[must_use]
pub fn new(
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
) -> Self {
Self::with_tools(
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
tools_schema::daemon_supported_tools(),
)
}
#[must_use]
pub fn with_tools(
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
tools: Vec<rmcp::model::Tool>,
) -> Self {
let enabled_tool_names: HashSet<String> =
tools.iter().map(|t| t.name.as_ref().to_owned()).collect();
Self {
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
tools,
enabled_tool_names,
}
}
#[must_use]
pub fn enabled_tool_names(&self) -> &HashSet<String> {
&self.enabled_tool_names
}
#[must_use]
pub fn advertised_tools(&self) -> &[rmcp::model::Tool] {
&self.tools
}
#[allow(dead_code)] pub(crate) fn daemon_graph_provider(&self) -> crate::workspace::acquirer::DaemonGraphProvider {
tool_core::daemon_graph_provider(
Arc::clone(&self.manager),
Arc::clone(&self.workspace_builder),
)
}
}
impl ServerHandler for DaemonMcpHandler {
fn get_info(&self) -> InitializeResult {
InitializeResult::new(ServerCapabilities::builder().enable_tools().build())
.with_protocol_version(ProtocolVersion::LATEST)
.with_server_info(Implementation::new(
"sqry-daemon-mcp",
self.daemon_version.to_owned(),
))
.with_instructions(
"sqry MCP server (daemon-hosted). Tool calls are served from \
the daemon's preloaded workspace state — same behaviour as \
sqry-mcp's standalone mode, zero graph rebuild cost.",
)
}
async fn list_tools(
&self,
_req: Option<PaginatedRequestParams>,
_ctx: RequestContext<RoleServer>,
) -> Result<ListToolsResult, McpError> {
Ok(ListToolsResult {
meta: None,
next_cursor: None,
tools: self.tools.clone(),
})
}
async fn call_tool(
&self,
req: CallToolRequestParams,
_ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let name = req.name.to_string();
let args_value = req.arguments.map_or(Value::Null, Value::Object);
if !self.enabled_tool_names.contains(&name) {
let reason = if tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&name.as_str()) {
format!(
"tool {name} is disabled by the daemon's active feature flags \
(see SQRY_MCP_ENABLE_* environment variables)"
)
} else {
format!("unknown tool name {name}: not in DAEMON_SUPPORTED_TOOL_NAMES")
};
return Err(daemon_err_to_mcp(DaemonError::InvalidArgument { reason }));
}
if name == "rebuild_index" {
let path = match args_value.as_object().and_then(|m| m.get("path")) {
Some(raw) => raw.as_str().map(String::from).ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: `path` must be a string, got: {raw}"),
})
})?,
None => ".".to_string(),
};
return self.handle_rebuild_index(&path, &args_value).await;
}
if name == "sqry_ask" {
return self.handle_sqry_ask(&args_value).await;
}
let path = extract_path_arg(&args_value).ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("{name}: missing or non-string `path` argument"),
})
})?;
let name_clone = name.clone();
let args_clone = args_value.clone();
let run = move |wctx: &WorkspaceContext,
cancel: &sqry_core::query::cancellation::CancellationToken|
-> anyhow::Result<Value> {
dispatch_by_name(&name_clone, wctx, &args_clone, cancel)
};
let static_tool_name: Option<&'static str> = tools_schema::DAEMON_SUPPORTED_TOOL_NAMES
.iter()
.copied()
.find(|&n| n == name.as_str());
let verdict = tool_core::acquire_and_execute(
Arc::clone(&self.manager),
Arc::clone(&self.workspace_builder),
Arc::clone(&self.tool_executor),
self.tool_timeout,
&path,
static_tool_name,
run,
)
.await
.map_err(|e| daemon_err_to_mcp_with_tool(e, &name))?;
let payload = match verdict {
ExecuteVerdict::Fresh { inner, .. } => inner,
ExecuteVerdict::Stale {
mut inner,
stale_warning,
..
} => {
if let Value::Object(ref mut map) = inner {
map.insert("_stale_warning".into(), Value::String(stale_warning));
}
inner
}
};
let text_payload =
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
Ok(call_tool_result_with_text_and_structured(
text_payload,
payload,
))
}
}
impl DaemonMcpHandler {
async fn handle_sqry_ask(&self, args_value: &Value) -> Result<CallToolResult, McpError> {
let ask_args = params_to_sqry_ask_args(args_value.clone()).map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::InvalidArgument {
reason: format!("sqry_ask: invalid arguments: {e}"),
},
"sqry_ask",
)
})?;
let path = ask_args.path.clone();
let original_execute = ask_args.execute;
let canonical_root = std::fs::canonicalize(&path).map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::InvalidArgument {
reason: format!("sqry_ask: cannot canonicalize path {path:?}: {e}"),
},
"sqry_ask",
)
})?;
let key = WorkspaceKey::new(canonical_root.clone(), ProjectRootMode::default(), 0);
let (loaded, translator) = self
.load_workspace_and_translator_for_sqry_ask(key, canonical_root.clone(), ask_args)
.await?;
let wctx = WorkspaceContext {
workspace_root: canonical_root.clone(),
graph: loaded.graph.load_full(),
executor: Arc::clone(&self.tool_executor),
};
let translator_only_args = SqryAskParams {
execute: false,
..params_to_sqry_ask_args(args_value.clone()).map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::InvalidArgument {
reason: format!("sqry_ask: invalid arguments: {e}"),
},
"sqry_ask",
)
})?
};
let translator_only_args_value =
serde_json::to_value(&translator_only_args).map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::Internal(anyhow::anyhow!(
"sqry_ask: re-serialize translator args: {e}"
)),
"sqry_ask",
)
})?;
let mut payload = Self::dispatch_sqry_ask_blocking(
wctx,
Arc::clone(&translator),
translator_only_args_value,
)
.await?;
if original_execute {
self.maybe_run_translated_graph_command(&mut payload, &canonical_root, path.as_str())
.await?;
}
let text_payload =
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
Ok(call_tool_result_with_text_and_structured(
text_payload,
payload,
))
}
async fn maybe_run_translated_graph_command(
&self,
payload: &mut Value,
canonical_root: &Path,
original_path: &str,
) -> Result<(), McpError> {
let command = payload
.get("data")
.and_then(|d| d.get("command"))
.and_then(|c| c.as_str())
.map(str::to_owned);
let Some(command) = command else {
return Ok(());
};
match parse_translated_graph_command(&command, original_path) {
Some((tool_name, mut tool_args)) => {
ensure_path_arg(&mut tool_args, original_path);
let exec_output = self
.dispatch_translated_graph_tool(tool_name, tool_args, original_path)
.await?;
splice_execution_output(payload, exec_output);
Ok(())
}
None => {
let marker = format!(
"[daemon] translated command {command:?} is not a daemon-supported \
graph-backed tool; daemon-hosted sqry_ask does not shell out for \
out-of-scope commands. Route via the daemon's MCP tools (e.g. \
semantic_search) or run the CLI from your client. \
workspace_root={}",
canonical_root.display()
);
splice_execution_output(payload, marker);
Ok(())
}
}
}
#[doc(hidden)]
pub async fn dispatch_translated_graph_tool(
&self,
tool_name: &'static str,
tool_args: Value,
path: &str,
) -> Result<String, McpError> {
if !tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&tool_name)
|| tool_name == "rebuild_index"
|| tool_name == "sqry_ask"
{
return Err(daemon_err_to_mcp_with_tool(
DaemonError::InvalidArgument {
reason: format!(
"sqry_ask translated dispatch: tool {tool_name} is not a \
daemon-supported graph-backed read-only tool"
),
},
"sqry_ask",
));
}
let name_clone: &'static str = tool_name;
let args_clone = tool_args;
let run = move |wctx: &WorkspaceContext,
cancel: &sqry_core::query::cancellation::CancellationToken|
-> anyhow::Result<Value> {
dispatch_by_name(name_clone, wctx, &args_clone, cancel)
};
let verdict = tool_core::acquire_and_execute(
Arc::clone(&self.manager),
Arc::clone(&self.workspace_builder),
Arc::clone(&self.tool_executor),
self.tool_timeout,
path,
Some(name_clone),
run,
)
.await
.map_err(|e| daemon_err_to_mcp_with_tool(e, "sqry_ask"))?;
let payload = match verdict {
ExecuteVerdict::Fresh { inner, .. } => inner,
ExecuteVerdict::Stale {
mut inner,
stale_warning,
..
} => {
if let Value::Object(ref mut map) = inner {
map.insert("_stale_warning".into(), Value::String(stale_warning));
}
inner
}
};
Ok(serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string()))
}
async fn load_workspace_and_translator_for_sqry_ask(
&self,
key: WorkspaceKey,
canonical_root: PathBuf,
ask_args: SqryAskParams,
) -> Result<(Arc<LoadedWorkspace>, Arc<Translator>), McpError> {
let manager = Arc::clone(&self.manager);
let builder = Arc::clone(&self.workspace_builder);
tokio::task::spawn_blocking(move || {
load_workspace_and_translator_for_sqry_ask_blocking(
&manager,
builder.as_ref(),
&key,
canonical_root.as_path(),
&ask_args,
)
})
.await
.map_err(|join_err| {
daemon_err_to_mcp_with_tool(
DaemonError::Internal(anyhow::anyhow!("spawn_blocking join: {join_err}")),
"sqry_ask",
)
})?
.map_err(map_sqry_ask_translator_init_error)
}
async fn dispatch_sqry_ask_blocking(
wctx: WorkspaceContext,
translator: Arc<Translator>,
args_value: Value,
) -> Result<Value, McpError> {
tokio::task::spawn_blocking(move || dispatch_sqry_ask(&wctx, &translator, &args_value))
.await
.map_err(|join_err| {
daemon_err_to_mcp_with_tool(
DaemonError::Internal(anyhow::anyhow!(
"sqry_ask: spawn_blocking join: {join_err}"
)),
"sqry_ask",
)
})?
.map_err(map_sqry_ask_dispatch_error)
}
async fn handle_rebuild_index(
&self,
path: &str,
args_value: &Value,
) -> Result<CallToolResult, McpError> {
use sqry_mcp::execution::{RebuildIndexData, ToolExecution};
let start = std::time::Instant::now();
let force = match args_value.as_object().and_then(|m| m.get("force")) {
Some(raw) => raw.as_bool().ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: `force` must be a boolean, got: {raw}"),
})
})?,
None => true,
};
let canonical_target = std::fs::canonicalize(path).map_err(|e| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: cannot canonicalize path {path:?}: {e}"),
})
})?;
let canonical_root: std::path::PathBuf = if canonical_target.is_dir() {
canonical_target.clone()
} else if let Some(parent) = canonical_target.parent() {
parent.to_path_buf()
} else {
return Err(daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!(
"rebuild_index: cannot derive workspace root from {} (no parent directory)",
canonical_target.display()
),
}));
};
let root_display = path_to_forward_slash(&canonical_root);
let storage = sqry_core::graph::unified::persistence::GraphStorage::new(&canonical_root);
if storage.exists() && !force {
return build_rebuild_index_cache_hit_response(
&canonical_root,
&root_display,
&storage,
start,
);
}
let key = WorkspaceKey::new(canonical_root.clone(), ProjectRootMode::default(), 0);
if force {
self.manager.unload(&key);
}
let working_set_estimate = initial_working_set_estimate();
let manager = Arc::clone(&self.manager);
let builder = Arc::clone(&self.workspace_builder);
let key_for_task = key.clone();
let graph = tokio::task::spawn_blocking(move || {
manager.get_or_load(&key_for_task, &*builder, working_set_estimate)
})
.await
.map_err(|join_err| {
daemon_err_to_mcp_with_tool(
DaemonError::WorkspaceBuildFailed {
root: canonical_root.clone(),
reason: format!("rebuild_index: task join error: {join_err}"),
},
"rebuild_index",
)
})?
.map_err(|e| daemon_err_to_mcp_with_tool(e, "rebuild_index"))?;
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
let node_count = graph.node_count() as u64;
let edge_count = graph.edge_count() as u64;
let files_indexed = graph.indexed_files().count() as u64;
let data = RebuildIndexData {
success: true,
root_path: root_display.clone(),
node_count,
edge_count,
files_indexed,
built_at: chrono::Utc::now().to_rfc3339(),
message: Some(if force {
"Index rebuilt successfully.".to_string()
} else {
"Index built successfully.".to_string()
}),
};
let execution = ToolExecution {
data,
used_index: false,
used_graph: true,
graph_metadata: None,
execution_ms: elapsed_ms,
next_page_token: None,
total: Some(1),
truncated: Some(false),
candidates_scanned: None,
workspace_path: root_display,
};
finalize_rebuild_index_response(execution)
}
}
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
#[must_use]
fn initial_working_set_estimate() -> u64 {
(INITIAL_WORKING_SET_BYTES as f64 * crate::config::WORKING_SET_MULTIPLIER) as u64
}
fn load_workspace_and_translator_for_sqry_ask_blocking(
manager: &Arc<WorkspaceManager>,
builder: &dyn WorkspaceBuilder,
key: &WorkspaceKey,
canonical_root: &Path,
ask_args: &SqryAskParams,
) -> Result<(Arc<LoadedWorkspace>, Arc<Translator>), DaemonError> {
let working_set_estimate = initial_working_set_estimate();
let _graph = manager.get_or_load(key, builder, working_set_estimate)?;
let loaded = manager
.lookup(key)
.ok_or_else(|| DaemonError::WorkspaceBuildFailed {
root: canonical_root.to_path_buf(),
reason: "sqry_ask: workspace was loaded but disappeared from manager".to_string(),
})?;
let translator = loaded
.nl_translator
.get_or_try_init(|| build_translator_for_sqry_ask(canonical_root, ask_args))?
.clone();
Ok((loaded, translator))
}
fn build_translator_for_sqry_ask(
scoped_path: &Path,
args: &SqryAskParams,
) -> Result<Arc<Translator>, DaemonError> {
let cfg = build_daemon_sqry_ask_translator_config(scoped_path, args);
match Translator::new(cfg) {
Ok(translator) => Ok(Arc::new(translator)),
Err(nl_err @ sqry_nl::NlError::OnnxRuntimeMissing { .. }) => {
Err(DaemonError::Internal(anyhow::Error::new(nl_err)))
}
Err(err) => Err(DaemonError::WorkspaceBuildFailed {
root: scoped_path.to_path_buf(),
reason: format!("sqry_ask: translator init failed: {err}"),
}),
}
}
fn map_sqry_ask_translator_init_error(err: DaemonError) -> McpError {
if let DaemonError::Internal(ref any_err) = err
&& let Some(nl_err) = any_err.downcast_ref::<sqry_nl::NlError>()
&& let Some(mcp_err) = try_onnx_runtime_missing_to_mcp(nl_err)
{
return mcp_err;
}
daemon_err_to_mcp_with_tool(err, "sqry_ask")
}
fn map_sqry_ask_dispatch_error(err: anyhow::Error) -> McpError {
if let Some(nl_err) = err.downcast_ref::<sqry_nl::NlError>()
&& let Some(mcp_err) = try_onnx_runtime_missing_to_mcp(nl_err)
{
return mcp_err;
}
daemon_err_to_mcp_with_tool(DaemonError::Internal(err), "sqry_ask")
}
fn extract_path_arg(args: &Value) -> Option<String> {
args.as_object()?.get("path")?.as_str().map(String::from)
}
#[must_use]
fn build_daemon_sqry_ask_translator_config(
scoped_path: &Path,
args: &SqryAskParams,
) -> TranslatorConfig {
build_translator_config_for_path(scoped_path, args)
}
fn parse_translated_graph_command(command: &str, path_arg: &str) -> Option<(&'static str, Value)> {
let tokens = tokenize_translated_command(command)?;
let mut iter = tokens.iter().map(String::as_str);
let bin = iter.next()?;
if bin != "sqry" {
return None;
}
let sub = iter.next()?;
let remaining: Vec<&str> = iter.collect();
match sub {
"query" => parse_translated_query(&remaining, path_arg),
"graph" => parse_translated_graph(&remaining, path_arg),
_ => None,
}
}
fn parse_translated_query(rest: &[&str], path_arg: &str) -> Option<(&'static str, Value)> {
let mut iter = rest.iter().copied().peekable();
let query_expr = iter.next()?;
let mut max_results: i64 = 100;
while let Some(tok) = iter.next() {
match tok {
"--limit" => {
if let Some(v) = iter.next()
&& let Ok(n) = v.parse::<i64>()
{
max_results = n.clamp(1, 5000);
}
}
"--language" | "--path" => {
let _ = iter.next();
}
_ => {}
}
}
Some((
"semantic_search",
json!({
"query": query_expr,
"path": path_arg,
"max_results": max_results,
"context_lines": 0,
"include_classpath": false,
}),
))
}
fn parse_translated_graph(rest: &[&str], path_arg: &str) -> Option<(&'static str, Value)> {
let mut iter = rest.iter().copied();
let cmd = iter.next()?;
match cmd {
"direct-callers" => {
let symbol = iter.next()?;
Some((
"direct_callers",
json!({
"symbol": symbol,
"path": path_arg,
"max_results": 100,
}),
))
}
"direct-callees" => {
let symbol = iter.next()?;
Some((
"direct_callees",
json!({
"symbol": symbol,
"path": path_arg,
"max_results": 100,
}),
))
}
"trace-path" => {
let from = iter.next()?;
let to = iter.next()?;
let mut max_hops: i64 = 10;
while let Some(tok) = iter.next() {
if tok == "--max-depth"
&& let Some(v) = iter.next()
&& let Ok(n) = v.parse::<i64>()
{
max_hops = n.clamp(1, 50);
}
}
Some((
"trace_path",
json!({
"from_symbol": from,
"to_symbol": to,
"path": path_arg,
"max_hops": max_hops,
"max_paths": 5,
}),
))
}
_ => None,
}
}
fn tokenize_translated_command(command: &str) -> Option<Vec<String>> {
let mut out: Vec<String> = Vec::new();
let mut buf = String::new();
let mut in_double = false;
let mut in_single = false;
let mut prev_escape = false;
let mut started = false;
for c in command.chars() {
if prev_escape {
buf.push(c);
prev_escape = false;
started = true;
continue;
}
if c == '\\' && (in_double || in_single) {
prev_escape = true;
continue;
}
if c == '"' && !in_single {
in_double = !in_double;
started = true;
continue;
}
if c == '\'' && !in_double {
in_single = !in_single;
started = true;
continue;
}
if c.is_whitespace() && !in_double && !in_single {
if started {
out.push(std::mem::take(&mut buf));
started = false;
}
continue;
}
buf.push(c);
started = true;
}
if in_double || in_single {
return None;
}
if started {
out.push(buf);
}
if out.is_empty() { None } else { Some(out) }
}
fn ensure_path_arg(args: &mut Value, path_arg: &str) {
if let Value::Object(map) = args {
map.insert("path".into(), Value::String(path_arg.to_string()));
}
}
fn splice_execution_output(payload: &mut Value, output: String) {
if let Some(Value::Object(data)) = payload.get_mut("data") {
data.insert("executionOutput".into(), Value::String(output));
}
}
fn path_to_forward_slash(p: &std::path::Path) -> String {
p.to_string_lossy().replace('\\', "/")
}
fn build_rebuild_index_cache_hit_response(
canonical_root: &std::path::Path,
root_display: &str,
storage: &sqry_core::graph::unified::persistence::GraphStorage,
start: std::time::Instant,
) -> Result<CallToolResult, McpError> {
use sqry_core::graph::unified::persistence::load_header_from_path;
use sqry_mcp::execution::{RebuildIndexData, ToolExecution};
let manifest = storage.load_manifest().map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::WorkspaceBuildFailed {
root: canonical_root.to_path_buf(),
reason: format!(
"rebuild_index: index exists at {} but manifest is unreadable: {e}",
canonical_root.display()
),
},
"rebuild_index",
)
})?;
let files_indexed: u64 = if let Ok(header) = load_header_from_path(storage.snapshot_path()) {
u64::try_from(header.file_count).unwrap_or(0)
} else if !manifest.file_count.is_empty() {
u64::try_from(manifest.file_count.values().sum::<usize>()).unwrap_or(0)
} else {
0
};
let data = RebuildIndexData {
success: true,
root_path: root_display.to_string(),
node_count: u64::try_from(manifest.node_count).unwrap_or(0),
edge_count: u64::try_from(manifest.edge_count).unwrap_or(0),
files_indexed,
built_at: manifest.built_at,
message: Some("Index already exists. Use force=true to rebuild.".to_string()),
};
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
let execution = ToolExecution {
data,
used_index: false,
used_graph: true,
graph_metadata: None,
execution_ms: elapsed_ms,
next_page_token: None,
total: Some(1),
truncated: Some(false),
candidates_scanned: None,
workspace_path: root_display.to_string(),
};
finalize_rebuild_index_response(execution)
}
fn finalize_rebuild_index_response(
execution: sqry_mcp::execution::ToolExecution<sqry_mcp::execution::RebuildIndexData>,
) -> Result<CallToolResult, McpError> {
let payload = sqry_mcp::daemon_adapter::tool_response_json(execution)?;
let text_payload =
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
Ok(call_tool_result_with_text_and_structured(
text_payload,
payload,
))
}
fn call_tool_result_with_text_and_structured(
text_payload: String,
payload: Value,
) -> CallToolResult {
let mut result = CallToolResult::structured(payload);
debug_assert_eq!(result.is_error, Some(false));
result.content = vec![Content::text(text_payload)];
result.is_error = None;
result
}
pub async fn host_mcp_on_streams<R, W>(
reader: R,
writer: W,
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
shutdown: CancellationToken,
) -> anyhow::Result<()>
where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
use rmcp::ServiceExt;
let handler = DaemonMcpHandler::new(
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
);
let service = handler.serve((reader, writer)).await?;
let service_ct = service.cancellation_token();
let shutdown_fwd = shutdown.clone();
let forwarder = tokio::spawn(async move {
shutdown_fwd.cancelled().await;
service_ct.cancel();
});
let wait_result = service.waiting().await;
forwarder.abort();
wait_result.map(|_| ()).map_err(anyhow::Error::from)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::workspace::builder::EmptyGraphBuilder;
fn test_builder() -> Arc<dyn WorkspaceBuilder> {
Arc::new(EmptyGraphBuilder)
}
#[test]
fn extract_path_arg_returns_path_when_present() {
let v = serde_json::json!({"path": "/tmp/ws", "other": 42});
assert_eq!(extract_path_arg(&v), Some("/tmp/ws".into()));
}
#[test]
fn extract_path_arg_returns_none_when_missing() {
let v = serde_json::json!({"other": 42});
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn extract_path_arg_returns_none_when_not_string() {
let v = serde_json::json!({"path": 42});
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn extract_path_arg_returns_none_on_non_object() {
let v = serde_json::Value::Null;
assert_eq!(extract_path_arg(&v), None);
let v = serde_json::json!([1, 2, 3]);
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn daemon_sqry_ask_translator_config_preserves_model_and_trust_params() {
let args = SqryAskParams {
query: "find call sites".to_string(),
path: ".".to_string(),
execute: false,
model_dir: Some("sqry-nl-models".to_string()),
allow_unverified_model: true,
allow_model_download: true,
};
let cfg = build_daemon_sqry_ask_translator_config(std::path::Path::new("workspace"), &args);
assert_eq!(
cfg.model_dir_override.as_deref(),
Some(std::path::Path::new("sqry-nl-models")),
"daemon sqry_ask must thread request model_dir into TranslatorConfig"
);
assert!(
cfg.allow_unverified_model,
"daemon sqry_ask must thread allow_unverified_model into TranslatorConfig"
);
assert!(
cfg.allow_model_download,
"daemon sqry_ask must thread allow_model_download into TranslatorConfig"
);
assert_eq!(
cfg.working_directory.as_deref(),
Some("workspace"),
"daemon sqry_ask must scope TranslatorConfig to the canonical request path"
);
}
#[test]
fn get_info_advertises_daemon_identity_and_tool_capability() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
let info = handler.get_info();
assert_eq!(info.server_info.name, "sqry-daemon-mcp");
assert_eq!(info.server_info.version, "0.0.0-test");
assert!(info.capabilities.tools.is_some());
assert!(
info.instructions
.as_deref()
.unwrap_or_default()
.contains("daemon-hosted"),
"instructions must mention daemon-hosted mode"
);
}
#[test]
fn handler_tools_list_is_subset_of_daemon_supported_names() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
for tool in &handler.tools {
assert!(
tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&tool.name.as_ref()),
"tool {:?} must be in DAEMON_SUPPORTED_TOOL_NAMES",
tool.name
);
}
}
#[test]
fn unknown_tool_and_missing_path_envelopes_have_identical_top_level_keys() {
use std::collections::BTreeSet;
let err_unknown = daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: "unknown tool name bogus_tool: not in DAEMON_SUPPORTED_TOOL_NAMES".into(),
});
let err_missing = daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: "semantic_search: missing or non-string `path` argument".into(),
});
let keys_unknown: BTreeSet<String> = err_unknown
.data
.as_ref()
.unwrap()
.as_object()
.unwrap()
.keys()
.cloned()
.collect();
let keys_missing: BTreeSet<String> = err_missing
.data
.as_ref()
.unwrap()
.as_object()
.unwrap()
.keys()
.cloned()
.collect();
assert_eq!(
keys_unknown, keys_missing,
"unknown-tool and missing-path envelopes must share the \
canonical 4-key top-level shape"
);
let expected: BTreeSet<String> = ["kind", "retryable", "retry_after_ms", "details"]
.iter()
.map(|s| (*s).to_string())
.collect();
assert_eq!(keys_unknown, expected);
}
#[test]
fn with_tools_derives_enabled_set_from_filtered_list_not_constant() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let full = tools_schema::daemon_supported_tools();
assert!(
full.len() >= 2,
"test prerequisite: default daemon_supported_tools must yield >= 2 tools"
);
let filtered: Vec<rmcp::model::Tool> = full
.iter()
.filter(|t| {
let n: &str = t.name.as_ref();
n == "semantic_search" || n == "find_unused"
})
.cloned()
.collect();
assert_eq!(filtered.len(), 2, "synthetic filter must yield exactly 2");
let handler = DaemonMcpHandler::with_tools(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
filtered,
);
let enabled = handler.enabled_tool_names();
assert_eq!(
enabled.len(),
2,
"enabled_tool_names must equal the filtered list size, not 15"
);
assert!(enabled.contains("semantic_search"));
assert!(enabled.contains("find_unused"));
assert!(
!enabled.contains("trace_path"),
"trace_path is in DAEMON_SUPPORTED_TOOL_NAMES but was excluded \
from the synthetic filter; enabled_tool_names must reflect the \
filter, not the unfiltered constant"
);
assert!(
!enabled.contains("export_graph"),
"export_graph excluded from synthetic filter — enabled set must \
not contain it"
);
assert!(
!enabled.contains("semantic_diff"),
"semantic_diff excluded from synthetic filter — enabled set must \
not contain it"
);
assert!(
!enabled.contains("dependency_impact"),
"dependency_impact excluded from synthetic filter — enabled set \
must not contain it"
);
}
#[test]
fn advertised_and_enabled_sets_are_bit_identical() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
let advertised: HashSet<String> = handler
.advertised_tools()
.iter()
.map(|t| t.name.as_ref().to_owned())
.collect();
let enabled = handler.enabled_tool_names();
assert_eq!(
&advertised, enabled,
"list_tools advertised set and call_tool authorization set MUST be bit-identical \
— any divergence breaks the advertised-vs-callable contract (Codex iter-0 MAJOR-1)"
);
}
#[test]
fn disabled_tool_rejection_distinguishes_disabled_from_unknown() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let full = tools_schema::daemon_supported_tools();
let only_semantic_search: Vec<rmcp::model::Tool> = full
.iter()
.filter(|t| {
let n: &str = t.name.as_ref();
n == "semantic_search"
})
.cloned()
.collect();
assert_eq!(only_semantic_search.len(), 1);
let handler = DaemonMcpHandler::with_tools(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
only_semantic_search,
);
assert_eq!(handler.enabled_tool_names().len(), 1);
assert!(handler.enabled_tool_names().contains("semantic_search"));
let disabled_name = "trace_path"; let unknown_name = "this_tool_does_not_exist_anywhere";
assert!(
!handler.enabled_tool_names().contains(disabled_name),
"trace_path must be classified as disabled, not enabled"
);
assert!(
tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&disabled_name),
"trace_path must remain in DAEMON_SUPPORTED_TOOL_NAMES — if not, \
this test must be updated to pick a different gated tool"
);
assert!(
!handler.enabled_tool_names().contains(unknown_name),
"synthetic unknown name must not be in the enabled set"
);
assert!(
!tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&unknown_name),
"synthetic unknown name must not be in DAEMON_SUPPORTED_TOOL_NAMES"
);
}
}