use crate::docset::{
open_default_docset_retriever, Docset, DocsetIngestOptions, DocsetIngestor, DocsetStore,
DocsetStoreConfig, RefreshStatus,
};
use crate::storage::{AccessContext, AccessLevel};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::io::{self, BufRead, Write};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::path::PathBuf;
use url::Url;
use uuid::Uuid;
pub const MCP_VERSION: &str = "2024-11-05";
#[derive(Debug, Clone, Serialize)]
pub struct ServerInfo {
pub name: String,
pub version: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ServerCapabilities {
pub tools: ToolsCapability,
#[serde(skip_serializing_if = "Option::is_none")]
pub resources: Option<ResourcesCapability>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolsCapability {
pub list_changed: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct ResourcesCapability {
pub subscribe: bool,
pub list_changed: bool,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolDefinition {
pub name: String,
pub description: String,
#[serde(rename = "inputSchema")]
pub input_schema: Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolResult {
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<Vec<ContentBlock>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ContentBlock {
#[serde(rename = "type")]
pub content_type: String,
pub text: String,
}
impl ToolResult {
pub fn success(text: impl Into<String>) -> Self {
Self {
content: Some(vec![ContentBlock {
content_type: "text".to_string(),
text: text.into(),
}]),
is_error: None,
}
}
pub fn error(text: impl Into<String>) -> Self {
Self {
content: Some(vec![ContentBlock {
content_type: "text".to_string(),
text: text.into(),
}]),
is_error: Some(true),
}
}
}
#[derive(Debug, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
pub id: Option<Value>,
pub method: String,
#[serde(default)]
pub params: Value,
}
#[derive(Debug, Serialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Serialize)]
struct DocsListItem {
id: String,
name: String,
start_url: String,
allowed_prefixes: Vec<String>,
refresh: Value,
status: Value,
due: bool,
}
#[derive(Debug, Serialize)]
struct DocsQueryHit {
score: f32,
sparse_score: Option<f32>,
doc_id: String,
chunk_id: String,
url: Option<String>,
title: Option<String>,
docset: Option<String>,
docset_id: Option<String>,
text: String,
}
pub struct DocsetMcpServer {
base_dir: PathBuf,
store: DocsetStore,
retriever: Option<crate::retrieval::HybridRetriever>,
allow_write: bool,
allow_private_urls: bool,
initialized: bool,
}
impl DocsetMcpServer {
pub fn new(base_dir: impl Into<PathBuf>, allow_write: bool, allow_private_urls: bool) -> Self {
let base_dir = base_dir.into();
let store = DocsetStore::new(&base_dir, DocsetStoreConfig::default());
Self {
base_dir,
store,
retriever: None,
allow_write,
allow_private_urls,
initialized: false,
}
}
async fn ensure_retriever(&mut self) -> Result<(), String> {
if self.retriever.is_some() {
return Ok(());
}
let retriever = open_default_docset_retriever(self.base_dir.clone())
.await
.map_err(|e| e.to_string())?;
self.retriever = Some(retriever);
Ok(())
}
pub async fn initialize(&mut self) -> Result<(), String> {
self.ensure_retriever().await?;
self.initialized = true;
Ok(())
}
pub fn get_server_info(&self) -> ServerInfo {
ServerInfo {
name: "ReasonKit Mem Docset Server".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
description: Some(
"MCP server for ReasonKit Mem docsets (Cursor-like @Docs): crawl, refresh, query"
.to_string(),
),
}
}
pub fn get_capabilities(&self) -> ServerCapabilities {
ServerCapabilities {
tools: ToolsCapability {
list_changed: false,
},
resources: Some(ResourcesCapability {
subscribe: false,
list_changed: false,
}),
}
}
pub fn list_tools(&self) -> Vec<ToolDefinition> {
let mut tools = vec![
ToolDefinition {
name: "rkmem_docs_list".to_string(),
description: "List configured docsets (name, start_url, refresh status, due)"
.to_string(),
input_schema: json!({
"type": "object",
"properties": {}
}),
},
ToolDefinition {
name: "rkmem_docs_query".to_string(),
description:
"Query indexed docsets (BM25). Returns chunks with URL/title/docset metadata."
.to_string(),
input_schema: json!({
"type": "object",
"properties": {
"query": { "type": "string" },
"top_k": { "type": "integer", "minimum": 1, "maximum": 50, "default": 8 },
"docset": {
"type": "string",
"description": "Optional docset name or UUID to restrict results"
}
},
"required": ["query"]
}),
},
];
if self.allow_write {
tools.extend([
ToolDefinition {
name: "rkmem_docs_add".to_string(),
description: "Add a new docset (writes to docset store)".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"name": { "type": "string" },
"start_url": { "type": "string" },
"allowed_prefixes": {
"type": "array",
"items": { "type": "string" },
"description": "If omitted, start_url is used"
}
},
"required": ["name", "start_url"]
}),
},
ToolDefinition {
name: "rkmem_docs_refresh".to_string(),
description: "Refresh docsets by crawling. Defaults to due-only.".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"due_only": { "type": "boolean", "default": true },
"max_pages": { "type": "integer", "minimum": 1, "maximum": 100000 },
"concurrency": { "type": "integer", "minimum": 1, "maximum": 64 },
"timeout_secs": { "type": "integer", "minimum": 1, "maximum": 300 }
}
}),
},
ToolDefinition {
name: "rkmem_docs_remove".to_string(),
description: "Remove a docset configuration and optionally delete indexed docs"
.to_string(),
input_schema: json!({
"type": "object",
"properties": {
"docset_id": { "type": "string" },
"keep_index": { "type": "boolean", "default": false }
},
"required": ["docset_id"]
}),
},
]);
}
tools
}
fn admin_context(&self, operation: &str) -> AccessContext {
AccessContext::new(
"rk-mem-docs-mcp".to_string(),
AccessLevel::Admin,
operation.to_string(),
)
}
async fn call_tool(&mut self, name: &str, arguments: Value) -> ToolResult {
match name {
"rkmem_docs_list" => self.handle_docs_list().await,
"rkmem_docs_query" => self.handle_docs_query(arguments).await,
"rkmem_docs_add" => self.handle_docs_add(arguments).await,
"rkmem_docs_refresh" => self.handle_docs_refresh(arguments).await,
"rkmem_docs_remove" => self.handle_docs_remove(arguments).await,
_ => ToolResult::error(format!("Unknown tool: {name}")),
}
}
async fn handle_docs_list(&self) -> ToolResult {
let docsets = match self.store.load().await {
Ok(d) => d,
Err(e) => return ToolResult::error(format!("Failed to load docsets: {e}")),
};
let now = Utc::now();
let items: Vec<DocsListItem> = docsets
.iter()
.map(|d| DocsListItem {
id: d.id.to_string(),
name: d.name.clone(),
start_url: d.start_url.clone(),
allowed_prefixes: d.allowed_prefixes.clone(),
refresh: serde_json::to_value(&d.refresh).unwrap_or(json!({})),
status: serde_json::to_value(&d.status).unwrap_or(json!({})),
due: d.is_due(now),
})
.collect();
ToolResult::success(serde_json::to_string_pretty(&items).unwrap_or_default())
}
async fn handle_docs_query(&mut self, args: Value) -> ToolResult {
let query = match args.get("query").and_then(|v| v.as_str()) {
Some(q) if !q.is_empty() => q,
_ => return ToolResult::error("query is required"),
};
if query.len() > 4096 {
return ToolResult::error("query too long (max 4096 chars)");
}
let top_k = args
.get("top_k")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(8);
let top_k = top_k.clamp(1, 50);
let docset_filter = args.get("docset").and_then(|v| v.as_str());
let wanted_docset_id = if let Some(filter) = docset_filter {
if let Ok(id) = Uuid::parse_str(filter) {
Some(id)
} else {
let docsets = match self.store.load().await {
Ok(d) => d,
Err(e) => return ToolResult::error(format!("Failed to load docsets: {e}")),
};
let Some(found) = docsets.iter().find(|d| d.name.eq_ignore_ascii_case(filter))
else {
return ToolResult::error(format!(
"Unknown docset: {filter} (expected name or UUID)"
));
};
Some(found.id)
}
} else {
None
};
if let Err(e) = self.ensure_retriever().await {
return ToolResult::error(format!("Failed to open retriever: {e}"));
}
let retriever = match self.retriever.as_ref() {
Some(r) => r,
None => return ToolResult::error("Retriever not initialized"),
};
let candidate_k = if top_k >= 200 {
top_k
} else {
top_k.saturating_mul(5).min(200)
};
let results = match retriever.search_sparse(query, candidate_k).await {
Ok(r) => r,
Err(e) => return ToolResult::error(format!("Search failed: {e}")),
};
let ctx = self.admin_context("docs_query");
let mut doc_cache: HashMap<Uuid, Option<crate::Document>> = HashMap::new();
let mut hits: Vec<DocsQueryHit> = Vec::with_capacity(top_k);
for r in results {
if hits.len() >= top_k {
break;
}
let doc = if let Some(cached) = doc_cache.get(&r.doc_id) {
cached.clone()
} else {
let loaded = match retriever.storage().get_document(&r.doc_id, &ctx).await {
Ok(d) => d,
Err(e) => return ToolResult::error(format!("Failed to read document: {e}")),
};
doc_cache.insert(r.doc_id, loaded.clone());
loaded
};
let Some(doc) = doc else {
continue;
};
let docset_id_tag = doc
.metadata
.tags
.iter()
.find(|t| t.starts_with("docset_id:"))
.cloned();
if docset_id_tag.is_none() {
continue;
}
if let Some(wanted) = wanted_docset_id {
let wanted_tag = format!("docset_id:{wanted}");
if !doc.metadata.tags.iter().any(|t| t == &wanted_tag) {
continue;
}
}
let docset_tag = doc
.metadata
.tags
.iter()
.find(|t| t.starts_with("docset:"))
.map(|t| t.trim_start_matches("docset:").to_string());
hits.push(DocsQueryHit {
score: r.score,
sparse_score: r.sparse_score,
doc_id: r.doc_id.to_string(),
chunk_id: r.chunk_id.to_string(),
url: doc.source.url.clone(),
title: doc.metadata.title.clone(),
docset: docset_tag,
docset_id: docset_id_tag.map(|t| t.trim_start_matches("docset_id:").to_string()),
text: r.text,
});
}
ToolResult::success(serde_json::to_string_pretty(&hits).unwrap_or_default())
}
async fn handle_docs_add(&mut self, args: Value) -> ToolResult {
if !self.allow_write {
return ToolResult::error("Write operations are disabled for this server");
}
let name = match args.get("name").and_then(|v| v.as_str()) {
Some(v) if !v.is_empty() => v,
_ => return ToolResult::error("name is required"),
};
let start_url = match args.get("start_url").and_then(|v| v.as_str()) {
Some(v) if !v.is_empty() => v,
_ => return ToolResult::error("start_url is required"),
};
let allowed_prefixes: Vec<String> = args
.get("allowed_prefixes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_else(|| vec![start_url.to_string()]);
if let Err(e) = self.validate_docset_urls(start_url, &allowed_prefixes) {
return ToolResult::error(e);
}
let docset = Docset::new(name.to_string(), start_url.to_string(), allowed_prefixes);
match self.store.upsert(docset).await {
Ok(saved) => ToolResult::success(
serde_json::to_string_pretty(&json!({
"ok": true,
"docset_id": saved.id.to_string(),
"name": saved.name,
}))
.unwrap_or_default(),
),
Err(e) => ToolResult::error(format!("Failed to save docset: {e}")),
}
}
async fn handle_docs_refresh(&mut self, args: Value) -> ToolResult {
if !self.allow_write {
return ToolResult::error("Write operations are disabled for this server");
}
if let Err(e) = self.ensure_retriever().await {
return ToolResult::error(format!("Failed to open retriever: {e}"));
}
let mut docsets = match self.store.load().await {
Ok(d) => d,
Err(e) => return ToolResult::error(format!("Failed to load docsets: {e}")),
};
if docsets.is_empty() {
return ToolResult::success(
"{\"ok\":true,\"message\":\"No docsets configured.\"}".to_string(),
);
}
let due_only = args
.get("due_only")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let mut opts = DocsetIngestOptions {
manifest_dir: Some(self.base_dir.join("docsets")),
refresh_due_only: due_only,
..Default::default()
};
if let Some(v) = args.get("max_pages").and_then(|v| v.as_u64()) {
opts.max_pages = v as usize;
}
if let Some(v) = args.get("concurrency").and_then(|v| v.as_u64()) {
opts.concurrency = v as usize;
}
if let Some(v) = args.get("timeout_secs").and_then(|v| v.as_u64()) {
opts.request_timeout = std::time::Duration::from_secs(v);
}
let retriever = match self.retriever.take() {
Some(r) => r,
None => {
return ToolResult::error("Retriever not initialized");
}
};
let ingestor = match DocsetIngestor::new(retriever) {
Ok(i) => i,
Err(e) => return ToolResult::error(format!("Failed to create ingestor: {e}")),
};
let mut reports = Vec::new();
for ds in docsets.iter_mut() {
if let Err(e) = self.validate_docset_urls(&ds.start_url, &ds.allowed_prefixes) {
ds.status = RefreshStatus::Error {
at: Utc::now(),
message: e.clone(),
};
reports.push(json!({
"docset_id": ds.id.to_string(),
"docset_name": ds.name,
"error": e,
}));
let _ = self.store.upsert(ds.clone()).await;
continue;
}
let res = ingestor.ingest_docset(ds, &opts).await;
match res {
Ok(report) => reports.push(json!({
"docset_id": report.docset_id.to_string(),
"docset_name": report.docset_name,
"discovery_method": report.discovery_method,
"discovered_urls": report.discovered_urls,
"fetched_pages": report.fetched_pages,
"indexed_pages": report.indexed_pages,
"skipped_unchanged": report.skipped_unchanged,
"removed_pages": report.removed_pages,
"failures": report.failures,
})),
Err(e) => {
ds.status = RefreshStatus::Error {
at: Utc::now(),
message: e.to_string(),
};
reports.push(json!({
"docset_id": ds.id.to_string(),
"docset_name": ds.name,
"error": e.to_string(),
}));
}
}
let _ = self.store.upsert(ds.clone()).await;
}
self.retriever = Some(ingestor.into_retriever());
ToolResult::success(
serde_json::to_string_pretty(&json!({
"ok": true,
"reports": reports
}))
.unwrap_or_default(),
)
}
async fn handle_docs_remove(&mut self, args: Value) -> ToolResult {
if !self.allow_write {
return ToolResult::error("Write operations are disabled for this server");
}
let id = match args.get("docset_id").and_then(|v| v.as_str()) {
Some(v) => v,
None => return ToolResult::error("docset_id is required"),
};
let docset_id = match Uuid::parse_str(id) {
Ok(v) => v,
Err(_) => return ToolResult::error("Invalid docset_id (expected UUID)"),
};
let keep_index = args
.get("keep_index")
.and_then(|v| v.as_bool())
.unwrap_or(false);
match self.store.delete(docset_id).await {
Ok(false) => return ToolResult::error(format!("Docset not found: {docset_id}")),
Ok(true) => {}
Err(e) => return ToolResult::error(format!("Failed to delete docset: {e}")),
}
if keep_index {
return ToolResult::success(
serde_json::to_string_pretty(&json!({ "ok": true, "removed": true }))
.unwrap_or_default(),
);
}
if let Err(e) = self.ensure_retriever().await {
return ToolResult::error(format!("Failed to open retriever: {e}"));
}
let retriever = match self.retriever.as_ref() {
Some(r) => r,
None => return ToolResult::error("Retriever not initialized"),
};
let ctx = self.admin_context("docs_remove");
let doc_ids = match retriever.storage().list_documents(&ctx).await {
Ok(ids) => ids,
Err(e) => return ToolResult::error(format!("Failed to list documents: {e}")),
};
let wanted_tag = format!("docset_id:{docset_id}");
let mut removed_docs = 0usize;
for doc_id in doc_ids {
let doc = match retriever.storage().get_document(&doc_id, &ctx).await {
Ok(d) => d,
Err(e) => return ToolResult::error(format!("Failed to read document: {e}")),
};
let Some(doc) = doc else {
continue;
};
if doc.metadata.tags.iter().any(|t| t == &wanted_tag) {
let _ = retriever.delete_document(&doc_id).await;
removed_docs += 1;
}
}
ToolResult::success(
serde_json::to_string_pretty(&json!({
"ok": true,
"removed_docs": removed_docs
}))
.unwrap_or_default(),
)
}
pub async fn handle_request(&mut self, request: JsonRpcRequest) -> JsonRpcResponse {
match request.method.as_str() {
"initialize" => {
if let Err(e) = self.initialize().await {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: format!("Failed to initialize: {e}"),
data: None,
}),
};
}
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(json!({
"protocolVersion": MCP_VERSION,
"capabilities": self.get_capabilities(),
"serverInfo": self.get_server_info(),
})),
error: None,
}
}
"tools/list" => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(json!({
"tools": self.list_tools()
})),
error: None,
},
"tools/call" => {
let name = request
.params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("");
let arguments = request
.params
.get("arguments")
.cloned()
.unwrap_or(json!({}));
let result = self.call_tool(name, arguments).await;
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(serde_json::to_value(result).unwrap_or(json!({}))),
error: None,
}
}
"notifications/initialized" | "initialized" => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: None,
result: None,
error: None,
},
_ => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: format!("Method not found: {}", request.method),
data: None,
}),
},
}
}
pub async fn run_stdio(&mut self) -> io::Result<()> {
let stdin = io::stdin();
let mut stdout = io::stdout();
for line in stdin.lock().lines() {
let line = line?;
if line.is_empty() {
continue;
}
let request: JsonRpcRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
let error_response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: None,
result: None,
error: Some(JsonRpcError {
code: -32700,
message: format!("Parse error: {}", e),
data: None,
}),
};
writeln!(stdout, "{}", serde_json::to_string(&error_response)?)?;
stdout.flush()?;
continue;
}
};
let response = self.handle_request(request).await;
if response.id.is_some() || response.error.is_some() {
writeln!(stdout, "{}", serde_json::to_string(&response)?)?;
stdout.flush()?;
}
}
Ok(())
}
}
impl DocsetMcpServer {
fn validate_docset_urls(
&self,
start_url: &str,
allowed_prefixes: &[String],
) -> Result<(), String> {
self.validate_url(start_url)?;
for p in allowed_prefixes {
self.validate_url(p)?;
}
Ok(())
}
fn validate_url(&self, url: &str) -> Result<(), String> {
let parsed = Url::parse(url).map_err(|e| format!("Invalid URL: {url} ({e})"))?;
match parsed.scheme() {
"http" | "https" => {}
other => return Err(format!("Unsupported URL scheme: {other} (only http/https)")),
}
let host = parsed
.host_str()
.ok_or_else(|| "URL must include a host".to_string())?;
if !self.allow_private_urls && is_private_or_local_host(host) {
return Err(format!(
"Blocked potentially private/localhost URL host: {host} (set RKMEM_ALLOW_PRIVATE_URLS=1 to override)"
));
}
Ok(())
}
}
fn is_private_or_local_host(host: &str) -> bool {
let host_lc = host.to_ascii_lowercase();
if host_lc == "localhost" || host_lc.ends_with(".localhost") {
return true;
}
if host_lc.ends_with(".local") || host_lc.ends_with(".internal") {
return true;
}
let Ok(ip) = host.parse::<IpAddr>() else {
return false;
};
match ip {
IpAddr::V4(v4) => is_private_ipv4(v4),
IpAddr::V6(v6) => is_private_ipv6(v6),
}
}
fn is_private_ipv4(ip: Ipv4Addr) -> bool {
ip.is_private() || ip.is_loopback() || ip.is_link_local() || ip.is_unspecified()
}
fn is_private_ipv6(ip: Ipv6Addr) -> bool {
if ip.is_loopback() || ip.is_unspecified() {
return true;
}
let octets = ip.octets();
let is_unique_local = (octets[0] & 0xfe) == 0xfc;
let is_link_local = octets[0] == 0xfe && (octets[1] & 0xc0) == 0x80;
is_unique_local || is_link_local
}