use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::model::CallToolResult;
use super::ServerState;
#[cfg(feature = "documents")]
use super::helpers::format_response;
use super::helpers::json_result;
#[cfg(feature = "documents")]
use super::types::{DocumentSearchHit, SearchDocumentsParams, SearchDocumentsResponse};
#[cfg(feature = "memory")]
use super::types::{
MemoryDeleteParams, MemoryDeleteResponse, MemoryEntry, MemoryGetParams, MemoryListParams,
MemoryListResponse, MemoryPutParams, MemoryPutResponse, MemoryRecord, MemorySearchHit,
MemorySearchParams, MemorySearchResponse,
};
#[cfg(feature = "documents")]
use crate::extract::doc::{DocEntity, DocKeyword, DocSummary};
#[cfg(feature = "intelligence")]
pub(super) async fn embed_query(state: &ServerState, text: &str) -> Result<Vec<f32>, McpError> {
let embedder = state
.embedder
.get_or_try_init(|| async {
crate::embeddings::SharedEmbedder::load("balanced")
.map(Arc::new)
.map_err(|e| format!("load embedder: {e}"))
})
.await
.map_err(|e| McpError::internal_error(e.clone(), None))?;
let embedder = Arc::clone(embedder);
let text = text.to_string();
tokio::task::spawn_blocking(move || embedder.embed(&text))
.await
.map_err(|e| McpError::internal_error(format!("spawn_blocking: {e}"), None))?
.map_err(|e| McpError::internal_error(format!("embed: {e}"), None))
}
#[cfg(any(feature = "memory", feature = "documents"))]
pub(super) async fn lance_store(
state: &ServerState,
) -> Result<Arc<crate::lance::LanceStore>, McpError> {
state
.lance
.get_or_try_init(|| async {
let embedder = state
.embedder
.get_or_try_init(|| async {
crate::embeddings::SharedEmbedder::load("balanced")
.map(Arc::new)
.map_err(|e| format!("load embedder: {e}"))
})
.await
.map_err(|e| format!("embedder init: {e}"))?;
let dim = embedder.dim();
let model = embedder.model().to_string();
let lance_dir = state.store.read().await.basemind_dir.join("lance");
let model_for_open = model.clone();
tokio::task::spawn_blocking(move || {
crate::lance::LanceStore::open(&lance_dir, dim, &model_for_open)
})
.await
.map_err(|e| format!("lance open join: {e}"))?
.map(Arc::new)
.map_err(|e| format!("open LanceStore: {e}"))
})
.await
.cloned()
.map_err(|e| McpError::internal_error(e.clone(), None))
}
#[cfg(feature = "memory")]
fn read_memory_record(idx: &crate::index::IndexDb, scope: &str, key: &str) -> Option<MemoryRecord> {
let raw_key = crate::index::keys::memory_by_key(scope, key);
let bytes = idx.memory_by_key.get(raw_key).ok().flatten()?;
rmp_serde::from_slice(&bytes).ok()
}
#[cfg(feature = "memory")]
fn write_memory_record(
idx: &crate::index::IndexDb,
scope: &str,
key: &str,
record: &MemoryRecord,
) -> Result<(), McpError> {
let raw_key = crate::index::keys::memory_by_key(scope, key);
let bytes = rmp_serde::to_vec_named(record)
.map_err(|e| McpError::internal_error(format!("serialize memory record: {e}"), None))?;
idx.memory_by_key
.insert(raw_key, bytes)
.map_err(|e| McpError::internal_error(format!("fjall insert: {e}"), None))
}
#[cfg(feature = "memory")]
fn delete_memory_record(
idx: &crate::index::IndexDb,
scope: &str,
key: &str,
) -> Result<bool, McpError> {
let raw_key = crate::index::keys::memory_by_key(scope, key);
let existed = idx
.memory_by_key
.get(raw_key.clone())
.map_err(|e| McpError::internal_error(format!("fjall get: {e}"), None))?
.is_some();
if existed {
idx.memory_by_key
.remove(raw_key)
.map_err(|e| McpError::internal_error(format!("fjall remove: {e}"), None))?;
}
Ok(existed)
}
#[cfg(feature = "memory")]
pub(super) async fn run_memory_put(
state: &ServerState,
params: MemoryPutParams,
) -> Result<CallToolResult, McpError> {
let now = crate::lance::now_micros();
let store = state.store.read().await;
let idx = store
.index_db
.as_ref()
.ok_or_else(|| McpError::internal_error("memory_by_key index not available", None))?;
let existing = read_memory_record(idx, &state.scope, ¶ms.key);
let created_at = existing.map(|r| r.created_at).unwrap_or(now);
let tags = params.tags.clone().unwrap_or_default();
let record = MemoryRecord {
value: params.value.clone(),
tags: tags.clone(),
created_at,
updated_at: now,
};
write_memory_record(idx, &state.scope, ¶ms.key, &record)?;
drop(store);
if params.embed {
let embedding = embed_query(state, ¶ms.value).await?;
let lance = lance_store(state).await?;
let row = crate::lance::MemoryRow {
scope: state.scope.clone(),
key: params.key.clone(),
value: params.value.clone(),
tags,
embedding,
created_at,
updated_at: now,
};
let lance_clone = Arc::clone(&lance);
tokio::task::spawn_blocking(move || lance_clone.upsert_memory(row))
.await
.map_err(|e| McpError::internal_error(format!("spawn_blocking: {e}"), None))?
.map_err(|e| McpError::internal_error(format!("upsert_memory: {e}"), None))?;
}
json_result(&MemoryPutResponse {
key: params.key,
created_at,
updated_at: now,
})
}
#[cfg(feature = "memory")]
pub(super) async fn run_memory_get(
state: &ServerState,
params: MemoryGetParams,
) -> Result<CallToolResult, McpError> {
let store = state.store.read().await;
let idx = store
.index_db
.as_ref()
.ok_or_else(|| McpError::internal_error("memory_by_key index not available", None))?;
let entry: Option<MemoryEntry> =
read_memory_record(idx, &state.scope, ¶ms.key).map(|r| MemoryEntry {
key: params.key.clone(),
value: r.value,
tags: r.tags,
created_at: r.created_at,
updated_at: r.updated_at,
});
json_result(&entry)
}
#[cfg(feature = "memory")]
const MEMORY_PREVIEW_CHARS: usize = 200;
#[cfg(feature = "memory")]
pub(super) async fn run_memory_list(
state: &ServerState,
params: MemoryListParams,
) -> Result<CallToolResult, McpError> {
use std::ops::Bound;
use super::cursor::{Cursor, prefix_upper_bound};
let limit = params
.limit
.unwrap_or(super::helpers::SEARCH_LIMIT_DEFAULT)
.min(super::helpers::SEARCH_LIMIT_MAX) as usize;
let store = state.store.read().await;
let idx = store
.index_db
.as_ref()
.ok_or_else(|| McpError::internal_error("memory_by_key index not available", None))?;
let scope_prefix = crate::index::keys::memory_by_key_scope_prefix(&state.scope);
let upper = prefix_upper_bound(&scope_prefix);
let cursor_bytes = params
.cursor
.as_ref()
.map(|c| c.decode_fjall())
.transpose()?;
let lower: Bound<Vec<u8>> = match cursor_bytes.as_deref() {
Some(k) => Bound::Excluded(k.to_vec()),
None => Bound::Included(scope_prefix.clone()),
};
let upper_bound: Bound<Vec<u8>> = match upper {
Some(b) => Bound::Excluded(b),
None => Bound::Unbounded,
};
let key_prefix_filter = params.prefix.as_deref().unwrap_or("");
let tag_filter = params.tag.as_deref();
let mut entries: Vec<MemoryEntry> = Vec::with_capacity(limit.min(64));
let mut total: usize = 0;
let mut last_emitted_key: Option<Vec<u8>> = None;
let mut has_more = false;
for guard in idx.memory_by_key.range::<Vec<u8>, _>((lower, upper_bound)) {
let (raw_key, raw_val) = guard
.into_inner()
.map_err(|e| McpError::internal_error(format!("index iter: {e}"), None))?;
let Some((_, key)) = crate::index::keys::parse_memory_by_key(&raw_key) else {
continue;
};
if !key.starts_with(key_prefix_filter) {
continue;
}
let Ok(record): Result<MemoryRecord, _> = rmp_serde::from_slice(&raw_val) else {
continue;
};
if let Some(tag) = tag_filter
&& !record.tags.iter().any(|t| t == tag)
{
continue;
}
total += 1;
if entries.len() < limit {
let value = if record.value.len() > MEMORY_PREVIEW_CHARS {
format!(
"{}…",
record
.value
.char_indices()
.nth(MEMORY_PREVIEW_CHARS)
.map(|(i, _)| &record.value[..i])
.unwrap_or(&record.value)
)
} else {
record.value.clone()
};
entries.push(MemoryEntry {
key,
value,
tags: record.tags,
created_at: record.created_at,
updated_at: record.updated_at,
});
last_emitted_key = Some(raw_key.to_vec());
} else {
has_more = true;
}
}
let next_cursor = if has_more {
last_emitted_key.as_deref().map(Cursor::encode_fjall)
} else {
None
};
json_result(&MemoryListResponse {
total,
truncated: total > limit,
entries,
next_cursor,
})
}
#[cfg(feature = "memory")]
pub(super) async fn run_memory_search(
state: &ServerState,
params: MemorySearchParams,
) -> Result<CallToolResult, McpError> {
let limit = params.limit.unwrap_or(10).min(100) as usize;
let embedding = embed_query(state, ¶ms.query).await?;
let lance = lance_store(state).await?;
let scope = state.scope.clone();
let tag = params.tag.clone();
let hits_raw = tokio::task::spawn_blocking(move || {
lance.search_memory(&scope, embedding, limit, tag.as_deref())
})
.await
.map_err(|e| McpError::internal_error(format!("spawn_blocking: {e}"), None))?
.map_err(|e| McpError::internal_error(format!("search_memory: {e}"), None))?;
let hits = hits_raw
.into_iter()
.map(|h| MemorySearchHit {
key: h.key,
value: h.value,
tags: h.tags,
distance: h.distance,
})
.collect();
json_result(&MemorySearchResponse {
query: params.query,
hits,
})
}
#[cfg(feature = "memory")]
pub(super) async fn run_memory_delete(
state: &ServerState,
params: MemoryDeleteParams,
) -> Result<CallToolResult, McpError> {
let store = state.store.read().await;
let idx = store
.index_db
.as_ref()
.ok_or_else(|| McpError::internal_error("memory_by_key index not available", None))?;
let deleted_fjall = delete_memory_record(idx, &state.scope, ¶ms.key)?;
drop(store);
if let Some(lance) = state.lance.get() {
let lance = Arc::clone(lance);
let scope = state.scope.clone();
let key = params.key.clone();
tokio::task::spawn_blocking(move || lance.delete_memory(&scope, &key))
.await
.ok();
}
json_result(&MemoryDeleteResponse {
deleted: deleted_fjall,
})
}
#[cfg(feature = "documents")]
pub(super) async fn run_search_documents(
state: &ServerState,
params: SearchDocumentsParams,
) -> Result<CallToolResult, McpError> {
let (output_format, reranker_enabled, reranker_preset, reranker_top_k) =
if params.overrides.any() {
let mut effective = (*state.config).clone();
crate::config::layered::apply_documents_overrides(
&mut effective,
¶ms.overrides,
crate::config::ConfigSource::Mcp,
None,
);
let r = &effective.documents.reranker;
(
effective.documents.output.format,
r.enabled,
r.preset.clone(),
r.top_k,
)
} else {
let r = &state.config.documents.reranker;
(
state.config.documents.output.format,
r.enabled,
r.preset.clone(),
r.top_k,
)
};
let limit = params.limit.unwrap_or(10).min(100) as usize;
let embedding = embed_query(state, ¶ms.query).await?;
let lance = lance_store(state).await?;
let scope = state.scope.clone();
let mime = params.mime_type.clone();
let hits_raw = tokio::task::spawn_blocking(move || {
lance.search_documents(&scope, embedding, limit, mime.as_deref())
})
.await
.map_err(|e| McpError::internal_error(format!("spawn_blocking: {e}"), None))?
.map_err(|e| McpError::internal_error(format!("search_documents: {e}"), None))?;
let mut hits: Vec<DocumentSearchHit> = hits_raw
.into_iter()
.map(|h| DocumentSearchHit {
path: h.path,
chunk_idx: h.chunk_idx,
text: h.text,
mime_type: h.mime_type,
byte_start: h.byte_start,
byte_end: h.byte_end,
distance: h.distance,
rerank_score: None,
keywords: Vec::new(),
entities: Vec::new(),
summary: None,
})
.collect();
attach_doc_metadata(
state,
&mut hits,
params.entity_category.as_deref(),
params.keywords_contains.as_deref(),
)
.await?;
if reranker_enabled && !hits.is_empty() {
if kreuzberg::get_reranker_preset(&reranker_preset).is_none() {
return Err(McpError::invalid_params(
format!("unknown reranker preset: {reranker_preset:?}"),
None,
));
}
let krz_config = kreuzberg::core::config::RerankerConfig {
model: kreuzberg::core::config::RerankerModelType::Preset {
name: reranker_preset,
},
top_k: Some(reranker_top_k),
..Default::default()
};
let documents: Vec<String> = hits.iter().map(|h| h.text.clone()).collect();
let reranked = kreuzberg::rerank_async(params.query.clone(), documents, &krz_config)
.await
.map_err(|e| {
let msg = e.to_string();
let kind = if msg.contains("download")
|| msg.contains("HuggingFace")
|| msg.contains("model")
{
"rerank model load"
} else {
"rerank inference"
};
McpError::internal_error(format!("{kind}: {msg}"), None)
})?;
let original_hits = std::mem::take(&mut hits);
hits = reranked
.into_iter()
.map(|r| {
original_hits
.get(r.index)
.cloned()
.map(|mut hit| {
hit.rerank_score = Some(r.score);
hit
})
.ok_or_else(|| {
McpError::internal_error(
format!(
"reranker returned out-of-range index {} (got {} hits)",
r.index,
original_hits.len()
),
None,
)
})
})
.collect::<Result<Vec<_>, _>>()?;
}
format_response(
&SearchDocumentsResponse {
query: params.query,
hits,
},
output_format,
)
}
#[cfg(feature = "documents")]
async fn attach_doc_metadata(
state: &ServerState,
hits: &mut Vec<DocumentSearchHit>,
entity_category: Option<&str>,
keywords_contains: Option<&str>,
) -> Result<(), McpError> {
if hits.is_empty() {
return Ok(());
}
let mut unique_paths: Vec<String> = Vec::with_capacity(hits.len());
{
let mut seen: ahash::AHashSet<&str> = ahash::AHashSet::new();
for h in hits.iter() {
if seen.insert(h.path.as_str()) {
unique_paths.push(h.path.clone());
}
}
}
let pairs: Vec<(String, std::path::PathBuf)> = {
let store = state.store.read().await;
unique_paths
.iter()
.filter_map(|p| {
store
.lookup(p.as_str())
.map(|entry| (p.clone(), store.blob_path_doc_hex(&entry.hash_hex)))
})
.collect()
};
type DocMeta = (Vec<DocKeyword>, Vec<DocEntity>, Option<DocSummary>);
let meta: ahash::AHashMap<String, DocMeta> = tokio::task::spawn_blocking(move || {
let mut out: ahash::AHashMap<String, DocMeta> =
ahash::AHashMap::with_capacity(pairs.len());
for (path, blob_path) in pairs {
if !blob_path.exists() {
continue;
}
let bytes = match std::fs::read(&blob_path) {
Ok(b) => b,
Err(e) => {
tracing::warn!(path = %path, error = %e, "read doc blob for metadata attach failed");
continue;
}
};
match rmp_serde::from_slice::<crate::extract::doc::FileMapDoc>(&bytes) {
Ok(doc) => {
out.insert(path, (doc.keywords, doc.entities, doc.summary));
}
Err(e) => {
tracing::warn!(path = %path, error = %e, "decode doc blob for metadata attach failed");
}
}
}
out
})
.await
.unwrap_or_default();
let cat_needle = entity_category.map(|s| s.to_lowercase());
let kw_needle = keywords_contains.map(|s| s.to_lowercase());
hits.retain_mut(|hit| {
let (kws, ents, summary) = meta
.get(&hit.path)
.cloned()
.unwrap_or_else(|| (Vec::new(), Vec::new(), None));
if let Some(needle) = cat_needle.as_deref()
&& !ents
.iter()
.any(|e| e.category.to_lowercase().contains(needle))
{
return false;
}
if let Some(needle) = kw_needle.as_deref()
&& !kws.iter().any(|k| k.text.to_lowercase().contains(needle))
{
return false;
}
hit.keywords = kws;
hit.entities = ents;
hit.summary = summary;
true
});
Ok(())
}