Skip to main content

rover/mcp/tools/
batch_fetch.rs

1//! MCP `batch_fetch` tool.
2//!
3//! Validates inputs, runs each URL through the SSRF policy, inserts a
4//! `batch_fetch` task carrying serialised `BatchFetchParams`, and returns
5//! the immediate `TaskCreatedResponse` envelope. The scheduler is notified
6//! by the storage layer itself — see `storage::tasks::insert` and the
7//! bridge installed in `mcp::server::serve_stdio`. SSRF rejects pre-empt
8//! the task insert.
9
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12use url::Url;
13
14use crate::fetcher::ssrf::{SsrfError, validate_url};
15use crate::mcp::envelope::TaskCreatedResponse;
16use crate::mcp::error::McpError;
17use crate::mcp::handler::RoverHandler;
18use crate::storage::tasks::{TaskInsert, TaskKind, insert};
19use crate::tasks::types::{BatchFetchParams, TaskId};
20
21const MAX_URLS: usize = 100;
22const MAX_CONCURRENCY: u32 = 32;
23const MAX_PER_DOMAIN: u32 = 8;
24
25/// Wire-side `batch_fetch` tool arguments.
26#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
27#[serde(deny_unknown_fields)]
28pub struct BatchFetchArgs {
29    pub urls: Vec<String>,
30    #[serde(default)]
31    pub force_refresh: bool,
32    #[serde(default)]
33    pub concurrency: Option<u32>,
34    #[serde(default)]
35    pub per_domain_concurrency: Option<u32>,
36}
37
38impl RoverHandler {
39    /// Tool body, decoupled from the `#[tool]` macro for unit testing.
40    pub async fn batch_fetch_inner(
41        &self,
42        args: BatchFetchArgs,
43    ) -> Result<TaskCreatedResponse, McpError> {
44        if args.urls.is_empty() {
45            return Err(McpError::EmptyUrlList);
46        }
47        if args.urls.len() > MAX_URLS {
48            return Err(McpError::TooManyUrls {
49                count: args.urls.len(),
50                max: MAX_URLS,
51            });
52        }
53        for raw in &args.urls {
54            let url = Url::parse(raw).map_err(|e| McpError::InvalidUrl(e.to_string()))?;
55            // Quick SSRF reject so we don't insert a task that will only fail.
56            validate_url(&url, self.ssrf_level)
57                .map_err(|e: SsrfError| McpError::Fetcher(crate::fetcher::FetcherError::Ssrf(e)))?;
58        }
59        let params = BatchFetchParams {
60            urls: args.urls.clone(),
61            concurrency: args.concurrency.unwrap_or(8).clamp(1, MAX_CONCURRENCY),
62            per_domain_concurrency: args
63                .per_domain_concurrency
64                .unwrap_or(2)
65                .clamp(1, MAX_PER_DOMAIN),
66            force_refresh: args.force_refresh,
67        };
68        let id = TaskId::new();
69        let params_json =
70            serde_json::to_string(&params).map_err(|e| McpError::InvalidArgs(e.to_string()))?;
71        insert(
72            &self.db,
73            TaskInsert {
74                id: id.as_str().to_string(),
75                kind: TaskKind::BatchFetch,
76                params_json,
77                owner_pid: Some(std::process::id() as i64),
78            },
79        )
80        .await?;
81        // Scheduler notification happens inside `storage::tasks::insert` via
82        // the Db-owned notifier installed by `mcp::server::serve_stdio`. No
83        // direct send from the tool layer.
84        Ok(TaskCreatedResponse {
85            task_id: id.as_str().to_string(),
86            status: "running".into(),
87            kind: "batch_fetch".into(),
88            monitor_command: format!("rover batch {id} --monitor"),
89            poll_command: format!("rover batch {id}"),
90            cancel_command: format!("rover task {id} --cancel"),
91            hint: "Use the Monitor tool with monitor_command for live updates, \
92                   or call poll_command to check status."
93                .into(),
94        })
95    }
96}