pub mod error_map;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use rmcp::ErrorData as McpError;
use rmcp::model::{
CallToolRequestParams, CallToolResult, Content, Implementation, InitializeResult,
ListToolsResult, PaginatedRequestParams, ProtocolVersion, ServerCapabilities, ToolsCapability,
};
use rmcp::service::RequestContext;
use rmcp::{RoleServer, ServerHandler};
use serde_json::Value;
use sqry_core::project::ProjectRootMode;
use sqry_core::query::executor::QueryExecutor;
use sqry_mcp::daemon_adapter::WorkspaceContext;
use sqry_mcp::daemon_adapter::dispatch::dispatch_by_name;
use sqry_mcp::tools_schema;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use crate::error::DaemonError;
use crate::ipc::tool_core::{self, ExecuteVerdict};
use crate::workspace::{WorkspaceBuilder, WorkspaceKey, WorkspaceManager};
use error_map::{daemon_err_to_mcp, daemon_err_to_mcp_with_tool};
pub struct DaemonMcpHandler {
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
tools: Vec<rmcp::model::Tool>,
enabled_tool_names: HashSet<String>,
}
impl DaemonMcpHandler {
#[must_use]
pub fn new(
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
) -> Self {
Self::with_tools(
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
tools_schema::daemon_supported_tools(),
)
}
#[must_use]
pub fn with_tools(
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
tools: Vec<rmcp::model::Tool>,
) -> Self {
let enabled_tool_names: HashSet<String> =
tools.iter().map(|t| t.name.as_ref().to_owned()).collect();
Self {
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
tools,
enabled_tool_names,
}
}
#[must_use]
pub fn enabled_tool_names(&self) -> &HashSet<String> {
&self.enabled_tool_names
}
#[must_use]
pub fn advertised_tools(&self) -> &[rmcp::model::Tool] {
&self.tools
}
}
impl ServerHandler for DaemonMcpHandler {
fn get_info(&self) -> InitializeResult {
InitializeResult {
protocol_version: ProtocolVersion::LATEST,
capabilities: ServerCapabilities {
tools: Some(ToolsCapability { list_changed: None }),
..Default::default()
},
server_info: Implementation {
name: "sqry-daemon-mcp".into(),
version: self.daemon_version.to_owned(),
title: None,
description: None,
icons: None,
website_url: None,
},
instructions: Some(
"sqry MCP server (daemon-hosted). Tool calls are served from \
the daemon's preloaded workspace state — same behaviour as \
sqry-mcp's standalone mode, zero graph rebuild cost."
.into(),
),
}
}
async fn list_tools(
&self,
_req: Option<PaginatedRequestParams>,
_ctx: RequestContext<RoleServer>,
) -> Result<ListToolsResult, McpError> {
Ok(ListToolsResult {
meta: None,
next_cursor: None,
tools: self.tools.clone(),
})
}
async fn call_tool(
&self,
req: CallToolRequestParams,
_ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let name = req.name.to_string();
let args_value = req.arguments.map_or(Value::Null, Value::Object);
if !self.enabled_tool_names.contains(&name) {
let reason = if tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&name.as_str()) {
format!(
"tool {name} is disabled by the daemon's active feature flags \
(see SQRY_MCP_ENABLE_* environment variables)"
)
} else {
format!("unknown tool name {name}: not in DAEMON_SUPPORTED_TOOL_NAMES")
};
return Err(daemon_err_to_mcp(DaemonError::InvalidArgument { reason }));
}
if name == "rebuild_index" {
let path = match args_value.as_object().and_then(|m| m.get("path")) {
Some(raw) => raw.as_str().map(String::from).ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: `path` must be a string, got: {raw}"),
})
})?,
None => ".".to_string(),
};
return self.handle_rebuild_index(&path, &args_value).await;
}
let path = extract_path_arg(&args_value).ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("{name}: missing or non-string `path` argument"),
})
})?;
let name_clone = name.clone();
let args_clone = args_value.clone();
let run = move |wctx: &WorkspaceContext| -> anyhow::Result<Value> {
dispatch_by_name(&name_clone, wctx, &args_clone)
};
let verdict = tool_core::classify_and_execute(
Arc::clone(&self.manager),
Arc::clone(&self.tool_executor),
self.tool_timeout,
&path,
run,
)
.await
.map_err(|e| daemon_err_to_mcp_with_tool(e, &name))?;
let payload = match verdict {
ExecuteVerdict::Fresh { inner, .. } => inner,
ExecuteVerdict::Stale {
mut inner,
stale_warning,
..
} => {
if let Value::Object(ref mut map) = inner {
map.insert("_stale_warning".into(), Value::String(stale_warning));
}
inner
}
};
let text_payload =
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
Ok(CallToolResult {
content: vec![Content::text(text_payload)],
structured_content: Some(payload),
is_error: None,
meta: None,
})
}
}
impl DaemonMcpHandler {
async fn handle_rebuild_index(
&self,
path: &str,
args_value: &Value,
) -> Result<CallToolResult, McpError> {
use sqry_mcp::execution::{RebuildIndexData, ToolExecution};
let start = std::time::Instant::now();
let force = match args_value.as_object().and_then(|m| m.get("force")) {
Some(raw) => raw.as_bool().ok_or_else(|| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: `force` must be a boolean, got: {raw}"),
})
})?,
None => true,
};
let canonical_target = std::fs::canonicalize(path).map_err(|e| {
daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!("rebuild_index: cannot canonicalize path {path:?}: {e}"),
})
})?;
let canonical_root: std::path::PathBuf = if canonical_target.is_dir() {
canonical_target.clone()
} else if let Some(parent) = canonical_target.parent() {
parent.to_path_buf()
} else {
return Err(daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: format!(
"rebuild_index: cannot derive workspace root from {} (no parent directory)",
canonical_target.display()
),
}));
};
let root_display = path_to_forward_slash(&canonical_root);
let storage = sqry_core::graph::unified::persistence::GraphStorage::new(&canonical_root);
if storage.exists() && !force {
return build_rebuild_index_cache_hit_response(
&canonical_root,
&root_display,
&storage,
start,
);
}
let key = WorkspaceKey::new(canonical_root.clone(), ProjectRootMode::default(), 0);
if force {
self.manager.unload(&key);
}
const INITIAL_WORKING_SET_BYTES: u64 = 2 * 1024 * 1024;
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
let working_set_estimate =
(INITIAL_WORKING_SET_BYTES as f64 * crate::config::WORKING_SET_MULTIPLIER) as u64;
let manager = Arc::clone(&self.manager);
let builder = Arc::clone(&self.workspace_builder);
let key_for_task = key.clone();
let graph = tokio::task::spawn_blocking(move || {
manager.get_or_load(&key_for_task, &*builder, working_set_estimate)
})
.await
.map_err(|join_err| {
daemon_err_to_mcp_with_tool(
DaemonError::WorkspaceBuildFailed {
root: canonical_root.clone(),
reason: format!("rebuild_index: task join error: {join_err}"),
},
"rebuild_index",
)
})?
.map_err(|e| daemon_err_to_mcp_with_tool(e, "rebuild_index"))?;
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
let node_count = graph.node_count() as u64;
let edge_count = graph.edge_count() as u64;
let files_indexed = graph.indexed_files().count() as u64;
let data = RebuildIndexData {
success: true,
root_path: root_display.clone(),
node_count,
edge_count,
files_indexed,
built_at: chrono::Utc::now().to_rfc3339(),
message: Some(if force {
"Index rebuilt successfully.".to_string()
} else {
"Index built successfully.".to_string()
}),
};
let execution = ToolExecution {
data,
used_index: false,
used_graph: true,
graph_metadata: None,
execution_ms: elapsed_ms,
next_page_token: None,
total: Some(1),
truncated: Some(false),
candidates_scanned: None,
workspace_path: root_display,
};
finalize_rebuild_index_response(execution)
}
}
fn extract_path_arg(args: &Value) -> Option<String> {
args.as_object()?.get("path")?.as_str().map(String::from)
}
fn path_to_forward_slash(p: &std::path::Path) -> String {
p.to_string_lossy().replace('\\', "/")
}
fn build_rebuild_index_cache_hit_response(
canonical_root: &std::path::Path,
root_display: &str,
storage: &sqry_core::graph::unified::persistence::GraphStorage,
start: std::time::Instant,
) -> Result<CallToolResult, McpError> {
use sqry_core::graph::unified::persistence::load_header_from_path;
use sqry_mcp::execution::{RebuildIndexData, ToolExecution};
let manifest = storage.load_manifest().map_err(|e| {
daemon_err_to_mcp_with_tool(
DaemonError::WorkspaceBuildFailed {
root: canonical_root.to_path_buf(),
reason: format!(
"rebuild_index: index exists at {} but manifest is unreadable: {e}",
canonical_root.display()
),
},
"rebuild_index",
)
})?;
let files_indexed: u64 = if let Ok(header) = load_header_from_path(storage.snapshot_path()) {
u64::try_from(header.file_count).unwrap_or(0)
} else if !manifest.file_count.is_empty() {
u64::try_from(manifest.file_count.values().sum::<usize>()).unwrap_or(0)
} else {
0
};
let data = RebuildIndexData {
success: true,
root_path: root_display.to_string(),
node_count: u64::try_from(manifest.node_count).unwrap_or(0),
edge_count: u64::try_from(manifest.edge_count).unwrap_or(0),
files_indexed,
built_at: manifest.built_at,
message: Some("Index already exists. Use force=true to rebuild.".to_string()),
};
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
let execution = ToolExecution {
data,
used_index: false,
used_graph: true,
graph_metadata: None,
execution_ms: elapsed_ms,
next_page_token: None,
total: Some(1),
truncated: Some(false),
candidates_scanned: None,
workspace_path: root_display.to_string(),
};
finalize_rebuild_index_response(execution)
}
fn finalize_rebuild_index_response(
execution: sqry_mcp::execution::ToolExecution<sqry_mcp::execution::RebuildIndexData>,
) -> Result<CallToolResult, McpError> {
let payload = sqry_mcp::daemon_adapter::tool_response_json(execution)?;
let text_payload =
serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
Ok(CallToolResult {
content: vec![Content::text(text_payload)],
structured_content: Some(payload),
is_error: None,
meta: None,
})
}
pub async fn host_mcp_on_streams<R, W>(
reader: R,
writer: W,
manager: Arc<WorkspaceManager>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
daemon_version: &'static str,
shutdown: CancellationToken,
) -> anyhow::Result<()>
where
R: AsyncRead + Send + Unpin + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
use rmcp::ServiceExt;
let handler = DaemonMcpHandler::new(
manager,
workspace_builder,
tool_executor,
tool_timeout,
daemon_version,
);
let service = handler.serve((reader, writer)).await?;
let service_ct = service.cancellation_token();
let shutdown_fwd = shutdown.clone();
let forwarder = tokio::spawn(async move {
shutdown_fwd.cancelled().await;
service_ct.cancel();
});
let wait_result = service.waiting().await;
forwarder.abort();
wait_result.map(|_| ()).map_err(anyhow::Error::from)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::workspace::builder::EmptyGraphBuilder;
fn test_builder() -> Arc<dyn WorkspaceBuilder> {
Arc::new(EmptyGraphBuilder)
}
#[test]
fn extract_path_arg_returns_path_when_present() {
let v = serde_json::json!({"path": "/tmp/ws", "other": 42});
assert_eq!(extract_path_arg(&v), Some("/tmp/ws".into()));
}
#[test]
fn extract_path_arg_returns_none_when_missing() {
let v = serde_json::json!({"other": 42});
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn extract_path_arg_returns_none_when_not_string() {
let v = serde_json::json!({"path": 42});
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn extract_path_arg_returns_none_on_non_object() {
let v = serde_json::Value::Null;
assert_eq!(extract_path_arg(&v), None);
let v = serde_json::json!([1, 2, 3]);
assert_eq!(extract_path_arg(&v), None);
}
#[test]
fn get_info_advertises_daemon_identity_and_tool_capability() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
let info = handler.get_info();
assert_eq!(info.server_info.name, "sqry-daemon-mcp");
assert_eq!(info.server_info.version, "0.0.0-test");
assert!(info.capabilities.tools.is_some());
assert!(
info.instructions
.as_deref()
.unwrap_or_default()
.contains("daemon-hosted"),
"instructions must mention daemon-hosted mode"
);
}
#[test]
fn handler_tools_list_is_subset_of_daemon_supported_names() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
for tool in &handler.tools {
assert!(
tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&tool.name.as_ref()),
"tool {:?} must be in DAEMON_SUPPORTED_TOOL_NAMES",
tool.name
);
}
}
#[test]
fn unknown_tool_and_missing_path_envelopes_have_identical_top_level_keys() {
use std::collections::BTreeSet;
let err_unknown = daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: "unknown tool name bogus_tool: not in DAEMON_SUPPORTED_TOOL_NAMES".into(),
});
let err_missing = daemon_err_to_mcp(DaemonError::InvalidArgument {
reason: "semantic_search: missing or non-string `path` argument".into(),
});
let keys_unknown: BTreeSet<String> = err_unknown
.data
.as_ref()
.unwrap()
.as_object()
.unwrap()
.keys()
.cloned()
.collect();
let keys_missing: BTreeSet<String> = err_missing
.data
.as_ref()
.unwrap()
.as_object()
.unwrap()
.keys()
.cloned()
.collect();
assert_eq!(
keys_unknown, keys_missing,
"unknown-tool and missing-path envelopes must share the \
canonical 4-key top-level shape"
);
let expected: BTreeSet<String> = ["kind", "retryable", "retry_after_ms", "details"]
.iter()
.map(|s| (*s).to_string())
.collect();
assert_eq!(keys_unknown, expected);
}
#[test]
fn with_tools_derives_enabled_set_from_filtered_list_not_constant() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let full = tools_schema::daemon_supported_tools();
assert!(
full.len() >= 2,
"test prerequisite: default daemon_supported_tools must yield >= 2 tools"
);
let filtered: Vec<rmcp::model::Tool> = full
.iter()
.filter(|t| {
let n: &str = t.name.as_ref();
n == "semantic_search" || n == "find_unused"
})
.cloned()
.collect();
assert_eq!(filtered.len(), 2, "synthetic filter must yield exactly 2");
let handler = DaemonMcpHandler::with_tools(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
filtered,
);
let enabled = handler.enabled_tool_names();
assert_eq!(
enabled.len(),
2,
"enabled_tool_names must equal the filtered list size, not 15"
);
assert!(enabled.contains("semantic_search"));
assert!(enabled.contains("find_unused"));
assert!(
!enabled.contains("trace_path"),
"trace_path is in DAEMON_SUPPORTED_TOOL_NAMES but was excluded \
from the synthetic filter; enabled_tool_names must reflect the \
filter, not the unfiltered constant"
);
assert!(
!enabled.contains("export_graph"),
"export_graph excluded from synthetic filter — enabled set must \
not contain it"
);
assert!(
!enabled.contains("semantic_diff"),
"semantic_diff excluded from synthetic filter — enabled set must \
not contain it"
);
assert!(
!enabled.contains("dependency_impact"),
"dependency_impact excluded from synthetic filter — enabled set \
must not contain it"
);
}
#[test]
fn advertised_and_enabled_sets_are_bit_identical() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let handler = DaemonMcpHandler::new(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
);
let advertised: HashSet<String> = handler
.advertised_tools()
.iter()
.map(|t| t.name.as_ref().to_owned())
.collect();
let enabled = handler.enabled_tool_names();
assert_eq!(
&advertised, enabled,
"list_tools advertised set and call_tool authorization set MUST be bit-identical \
— any divergence breaks the advertised-vs-callable contract (Codex iter-0 MAJOR-1)"
);
}
#[test]
fn disabled_tool_rejection_distinguishes_disabled_from_unknown() {
use crate::config::DaemonConfig;
let manager = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig::default()));
let executor = Arc::new(QueryExecutor::new());
let full = tools_schema::daemon_supported_tools();
let only_semantic_search: Vec<rmcp::model::Tool> = full
.iter()
.filter(|t| {
let n: &str = t.name.as_ref();
n == "semantic_search"
})
.cloned()
.collect();
assert_eq!(only_semantic_search.len(), 1);
let handler = DaemonMcpHandler::with_tools(
manager,
test_builder(),
executor,
Duration::from_secs(60),
"0.0.0-test",
only_semantic_search,
);
assert_eq!(handler.enabled_tool_names().len(), 1);
assert!(handler.enabled_tool_names().contains("semantic_search"));
let disabled_name = "trace_path"; let unknown_name = "this_tool_does_not_exist_anywhere";
assert!(
!handler.enabled_tool_names().contains(disabled_name),
"trace_path must be classified as disabled, not enabled"
);
assert!(
tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&disabled_name),
"trace_path must remain in DAEMON_SUPPORTED_TOOL_NAMES — if not, \
this test must be updated to pick a different gated tool"
);
assert!(
!handler.enabled_tool_names().contains(unknown_name),
"synthetic unknown name must not be in the enabled set"
);
assert!(
!tools_schema::DAEMON_SUPPORTED_TOOL_NAMES.contains(&unknown_name),
"synthetic unknown name must not be in DAEMON_SUPPORTED_TOOL_NAMES"
);
}
}