#![cfg(feature = "crawl")]
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::model::CallToolResult;
use super::ServerState;
use super::helpers::json_result;
use super::memory::lance_store;
use super::types::{
WebCrawlPageOutcome, WebCrawlParams, WebCrawlResponse, WebMapEntry, WebMapParams,
WebMapResponse, WebScrapeParams, WebScrapeResponse,
};
use crate::embeddings::SharedEmbedder;
use crate::web::ingest::{default_scope, index_page};
fn mcp_internal(prefix: &str, err: impl std::fmt::Display) -> McpError {
McpError::internal_error(format!("{prefix}: {err}"), None)
}
fn reject_redirected_private_url(context: &str, fetched_url: &str) -> Result<(), McpError> {
match crate::url::Url::parse(fetched_url) {
Ok(_) => Ok(()),
Err(crate::url::UrlError::PrivateHost(host)) => Err(McpError::invalid_params(
format!(
"{context}: refusing to index private/loopback host reached via redirect: {host} \
(set BASEMIND_ALLOW_PRIVATE_HOSTS=1 to allow)"
),
None,
)),
Err(other) => Err(McpError::invalid_params(
format!("{context}: refusing to index unparsable fetched URL {fetched_url:?}: {other}"),
None,
)),
}
}
fn resolve_scope(explicit: Option<&str>, requested: &crate::url::Url, final_url: &str) -> String {
if let Some(scope) = explicit {
return scope.to_string();
}
match crate::url::Url::parse(final_url) {
Ok(resolved) => default_scope(&resolved),
Err(_) => default_scope(requested),
}
}
#[cfg(feature = "crawl")]
fn reject_zero_override(field: &str, value: Option<u32>) -> Result<(), McpError> {
if value == Some(0) {
return Err(McpError::invalid_params(
format!("{field} must be >= 1"),
None,
));
}
Ok(())
}
#[cfg(feature = "crawl")]
fn per_call_engine(
state: &ServerState,
max_pages: Option<u32>,
max_depth: Option<u32>,
) -> Result<kreuzcrawl::CrawlEngineHandle, McpError> {
let mut cfg = state.config.crawl.clone();
if let Some(mp) = max_pages {
cfg.max_pages = mp;
}
if let Some(md) = max_depth {
cfg.max_depth = md;
}
crate::web::build_engine(&cfg).map_err(|e| mcp_internal("build per-call crawl engine", e))
}
async fn embedder(state: &ServerState) -> Result<Arc<SharedEmbedder>, McpError> {
let preset = state.config.documents.embedding_preset.clone();
let embedder = state
.embedder
.get_or_try_init(|| async {
SharedEmbedder::load(&preset)
.map(Arc::new)
.map_err(|e| format!("load embedder: {e}"))
})
.await
.map_err(|e| McpError::internal_error(e.clone(), None))?;
Ok(Arc::clone(embedder))
}
fn engine(state: &ServerState) -> Result<&kreuzcrawl::CrawlEngineHandle, McpError> {
state.crawl_engine.as_ref().ok_or_else(|| {
McpError::internal_error(
"crawl engine not initialised; check basemind serve startup logs",
None,
)
})
}
pub(super) async fn run_web_scrape(
state: &ServerState,
params: WebScrapeParams,
) -> Result<CallToolResult, McpError> {
let engine = engine(state)?;
let url_str = params.url.as_str().to_string();
let result = kreuzcrawl::scrape(engine, &url_str)
.await
.map_err(|e| mcp_internal("kreuzcrawl scrape", e))?;
reject_redirected_private_url("web_scrape", &result.final_url)?;
let scope = resolve_scope(params.scope.as_deref(), ¶ms.url, &result.final_url);
let body_text: String = result
.markdown
.as_ref()
.map(|m| m.content.clone())
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| result.html.clone());
let response = if params.index {
let lance = lance_store(state).await?;
let embedder = embedder(state).await?;
let documents_cfg = state.config.documents.clone();
let scope_for_block = scope.clone();
let final_url_for_block = result.final_url.clone();
let mime_for_block = result.content_type.clone();
let indexed = tokio::task::spawn_blocking(move || {
index_page(
lance.as_ref(),
&embedder,
&documents_cfg,
&scope_for_block,
&final_url_for_block,
&mime_for_block,
&body_text,
)
})
.await
.map_err(|e| mcp_internal("spawn_blocking", e))?
.map_err(|e| mcp_internal("index_page", e))?;
WebScrapeResponse {
url: url_str,
final_url: result.final_url,
status_code: result.status_code,
content_type: result.content_type,
bytes: indexed.bytes,
chunks_indexed: indexed.chunks_indexed,
indexed: indexed.chunks_indexed > 0,
scope,
}
} else {
WebScrapeResponse {
url: url_str,
final_url: result.final_url,
status_code: result.status_code,
content_type: result.content_type,
bytes: body_text.len(),
chunks_indexed: 0,
indexed: false,
scope,
}
};
json_result(&response)
}
pub(super) async fn run_web_crawl(
state: &ServerState,
params: WebCrawlParams,
) -> Result<CallToolResult, McpError> {
engine(state)?;
reject_zero_override("max_pages", params.max_pages)?;
reject_zero_override("max_depth", params.max_depth)?;
let engine = per_call_engine(state, params.max_pages, params.max_depth)?;
let url_str = params.url.as_str().to_string();
let crawl_outcome = kreuzcrawl::crawl(&engine, &url_str)
.await
.map_err(|e| mcp_internal("kreuzcrawl crawl", e))?;
let scope = params
.scope
.clone()
.unwrap_or_else(|| default_scope(¶ms.url));
let pages_visited = crawl_outcome.pages.len();
let lance = lance_store(state).await?;
let embedder = embedder(state).await?;
let documents_cfg = state.config.documents.clone();
let mut total_chunks = 0usize;
let mut pages_indexed = 0usize;
let mut outcomes: Vec<WebCrawlPageOutcome> = Vec::with_capacity(crawl_outcome.pages.len());
for page in crawl_outcome.pages {
if let Err(error) = reject_redirected_private_url("web_crawl", &page.normalized_url) {
tracing::warn!(
url = %page.normalized_url,
"web_crawl: skipping private/loopback page reached via crawl"
);
outcomes.push(WebCrawlPageOutcome {
url: page.normalized_url,
status_code: page.status_code,
chunks_indexed: 0,
indexed: false,
error: Some(error.message.to_string()),
});
continue;
}
let body_text = page
.markdown
.as_ref()
.map(|m| m.content.clone())
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| page.html.clone());
let page_scope = match params.scope.as_deref() {
Some(s) => s.to_string(),
None => match crate::url::Url::parse(&page.normalized_url) {
Ok(u) => default_scope(&u),
Err(_) => scope.clone(),
},
};
let lance_for_block = Arc::clone(&lance);
let embedder_for_block = Arc::clone(&embedder);
let docs_for_block = documents_cfg.clone();
let scope_for_block = page_scope;
let path_for_block = page.normalized_url.clone();
let mime_for_block = page.content_type.clone();
let res = tokio::task::spawn_blocking(move || {
index_page(
lance_for_block.as_ref(),
&embedder_for_block,
&docs_for_block,
&scope_for_block,
&path_for_block,
&mime_for_block,
&body_text,
)
})
.await;
let outcome = match res {
Ok(Ok(indexed)) => {
if indexed.chunks_indexed > 0 {
pages_indexed += 1;
total_chunks += indexed.chunks_indexed;
}
WebCrawlPageOutcome {
url: page.normalized_url,
status_code: page.status_code,
chunks_indexed: indexed.chunks_indexed,
indexed: indexed.chunks_indexed > 0,
error: None,
}
}
Ok(Err(error)) => {
tracing::warn!(url = %page.normalized_url, ?error, "web_crawl index_page failed");
WebCrawlPageOutcome {
url: page.normalized_url,
status_code: page.status_code,
chunks_indexed: 0,
indexed: false,
error: Some(error.to_string()),
}
}
Err(join_err) => WebCrawlPageOutcome {
url: page.normalized_url,
status_code: page.status_code,
chunks_indexed: 0,
indexed: false,
error: Some(format!("spawn_blocking: {join_err}")),
},
};
outcomes.push(outcome);
}
json_result(&WebCrawlResponse {
seed_url: url_str,
pages_visited,
pages_indexed,
total_chunks,
scope,
pages: outcomes,
error: crawl_outcome.error,
})
}
pub(super) async fn run_web_map(
state: &ServerState,
params: WebMapParams,
) -> Result<CallToolResult, McpError> {
let engine = engine(state)?;
let url_str = params.url.as_str().to_string();
let map = kreuzcrawl::map_urls(engine, &url_str)
.await
.map_err(|e| mcp_internal("kreuzcrawl map_urls", e))?;
let urls: Vec<WebMapEntry> = map
.urls
.into_iter()
.map(|u| WebMapEntry {
url: u.url,
lastmod: u.lastmod,
changefreq: u.changefreq,
priority: u.priority,
})
.collect();
json_result(&WebMapResponse {
url: url_str,
total_urls: urls.len(),
urls,
})
}
#[cfg(test)]
mod tests {
use super::*;
fn env_lock() -> std::sync::MutexGuard<'static, ()> {
crate::url::PRIVATE_HOSTS_ENV_LOCK
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
#[test]
fn rejects_zero_max_pages_and_depth() {
assert!(reject_zero_override("max_pages", Some(0)).is_err());
assert!(reject_zero_override("max_depth", Some(0)).is_err());
assert!(reject_zero_override("max_pages", None).is_ok());
assert!(reject_zero_override("max_pages", Some(1)).is_ok());
assert!(reject_zero_override("max_depth", Some(50)).is_ok());
}
#[test]
fn zero_override_error_names_the_field_and_bound() {
let err = reject_zero_override("max_pages", Some(0)).expect_err("0 must reject");
assert!(
err.message.contains("max_pages") && err.message.contains(">= 1"),
"error should name the field and the >= 1 bound; got: {}",
err.message
);
}
#[test]
fn rejects_private_redirect_target() {
let _g = env_lock();
unsafe { std::env::remove_var("BASEMIND_ALLOW_PRIVATE_HOSTS") };
let err =
reject_redirected_private_url("web_scrape", "http://169.254.169.254/latest/meta-data/")
.expect_err("link-local redirect target must be rejected");
assert!(
err.message.contains("169.254.169.254"),
"rejection should name the private host; got: {}",
err.message
);
}
#[test]
fn rejects_loopback_redirect_target() {
let _g = env_lock();
unsafe { std::env::remove_var("BASEMIND_ALLOW_PRIVATE_HOSTS") };
assert!(reject_redirected_private_url("web_crawl", "http://127.0.0.1:9000/").is_err());
assert!(reject_redirected_private_url("web_crawl", "http://localhost/admin").is_err());
}
#[test]
fn allows_public_redirect_target() {
let _g = env_lock();
unsafe { std::env::remove_var("BASEMIND_ALLOW_PRIVATE_HOSTS") };
assert!(reject_redirected_private_url("web_scrape", "https://example.com/landing").is_ok());
}
}