1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::extras::logger::JSON_LOGGER;
5use crate::filters::{ResponseFilter, ResponseStrategy};
6use crate::services::cache::scrape_cache::get_scrape_cache;
7use async_trait::async_trait;
8use reqwest::Client;
9use serde_json::{json, Value};
10use std::env;
11use std::time::Duration;
12use std::collections::HashMap;
13use log::{info, warn, error};
14
15pub struct Scraper;
16
17#[async_trait]
18impl Tool for Scraper {
19 fn name(&self) -> &str {
20 "scrape_website"
21 }
22
23 fn description(&self) -> &str {
24 "Scrape a webpage using BrightData with intelligent caching and priority-based processing. Supports Web Unlocker with Redis cache for improved performance."
25 }
26
27 fn input_schema(&self) -> Value {
28 json!({
29 "type": "object",
30 "properties": {
31 "url": {
32 "type": "string",
33 "description": "The URL to scrape"
34 },
35 "session_id": {
36 "type": "string",
37 "description": "Session ID for caching and conversation context tracking"
38 },
39 "data_type": {
40 "type": "string",
41 "enum": ["auto", "article", "product", "news", "contact", "general"],
42 "default": "auto",
43 "description": "Type of content to focus on during extraction"
44 },
45 "extraction_format": {
46 "type": "string",
47 "enum": ["structured", "markdown", "text", "json"],
48 "default": "structured",
49 "description": "Format for extracted content"
50 },
51 "clean_content": {
52 "type": "boolean",
53 "default": true,
54 "description": "Remove noise and focus on main content"
55 },
56 "schema": {
57 "type": "object",
58 "description": "Optional extraction schema for structured data"
59 },
60 "force_refresh": {
61 "type": "boolean",
62 "default": false,
63 "description": "Force fresh scraping, bypassing cache"
64 }
65 },
66 "required": ["url"]
67 })
68 }
69
70 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71 self.execute_internal(parameters).await
72 }
73
74 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75 let url = parameters
76 .get("url")
77 .and_then(|v| v.as_str())
78 .ok_or_else(|| BrightDataError::ToolError("Missing 'url' parameter".into()))?;
79
80 let session_id = parameters
81 .get("user_id")
82 .and_then(|v| v.as_str())
83 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85 let data_type = parameters
86 .get("data_type")
87 .and_then(|v| v.as_str())
88 .unwrap_or("auto");
89
90 let extraction_format = parameters
91 .get("extraction_format")
92 .and_then(|v| v.as_str())
93 .unwrap_or("structured");
94
95 let clean_content = parameters
96 .get("clean_content")
97 .and_then(|v| v.as_bool())
98 .unwrap_or(true);
99
100 let force_refresh = parameters
101 .get("force_refresh")
102 .and_then(|v| v.as_bool())
103 .unwrap_or(false);
104
105 let schema = parameters.get("schema").cloned();
106
107 let execution_id = self.generate_execution_id();
108
109 info!("๐ Scraping request: '{}' (session: {}, type: {}, format: {})",
110 url, session_id, data_type, extraction_format);
111
112 if !force_refresh {
114 match self.check_cache_first(url, session_id).await {
115 Ok(Some(cached_result)) => {
116 info!("๐ Cache HIT: Returning cached data for {} in session {}", url, session_id);
117
118 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
120 let source_used = "Cache";
121 let method_used = "Redis Cache";
122
123 let formatted_response = self.create_formatted_scrape_response(
124 url, data_type, extraction_format, content, &execution_id
125 );
126
127 let tool_result = ToolResult::success_with_raw(
128 vec![McpContent::text(formatted_response)],
129 cached_result
130 );
131
132 if self.is_data_reduction_enabled() {
134 return Ok(ResponseStrategy::apply_size_limits(tool_result));
135 } else {
136 return Ok(tool_result);
137 }
138 }
139 Ok(None) => {
140 info!("๐พ Cache MISS: Fetching fresh data for {} in session {}", url, session_id);
141 }
142 Err(e) => {
143 warn!("๐จ Cache error (continuing with fresh fetch): {}", e);
144 }
145 }
146 } else {
147 info!("๐ Force refresh requested, bypassing cache for {}", url);
148 }
149
150 match self.scrape_with_brightdata(url, data_type, extraction_format, clean_content, schema, &execution_id).await {
152 Ok(result) => {
153 if let Err(e) = self.store_in_cache(url, session_id, &result).await {
155 warn!("Failed to store result in cache: {}", e);
156 }
157
158 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
159
160 let formatted_response = self.create_formatted_scrape_response(
162 url, data_type, extraction_format, content, &execution_id
163 );
164
165 let tool_result = ToolResult::success_with_raw(
166 vec![McpContent::text(formatted_response)],
167 result
168 );
169
170 if self.is_data_reduction_enabled() {
172 Ok(ResponseStrategy::apply_size_limits(tool_result))
173 } else {
174 Ok(tool_result)
175 }
176 }
177 Err(_e) => {
178 warn!("BrightData error for URL '{}', returning empty data for retry", url);
180 let empty_response = json!({
181 "url": url,
182 "data_type": data_type,
183 "status": "no_data",
184 "reason": "brightdata_error",
185 "execution_id": execution_id,
186 "session_id": session_id
187 });
188
189 Ok(ToolResult::success_with_raw(
190 vec![McpContent::text("๐ **No Data Available**\n\nPlease try again with a different URL or check if the website is accessible.".to_string())],
191 empty_response
192 ))
193 }
194 }
195 }
196}
197
198impl Scraper {
199 fn is_data_reduction_enabled(&self) -> bool {
201 std::env::var("DEDUCT_DATA")
202 .unwrap_or_else(|_| "false".to_string())
203 .to_lowercase() == "true"
204 }
205
206 fn create_formatted_scrape_response(
208 &self,
209 url: &str,
210 data_type: &str,
211 extraction_format: &str,
212 content: &str,
213 execution_id: &str
214 ) -> String {
215 if !self.is_data_reduction_enabled() {
217 return format!(
218 "๐ **Data Extraction from: {}**\n\n## Full Content\n{}\n\n*Data Type: {} | Format: {} โข Execution: {}*",
219 url,
220 content,
221 data_type,
222 extraction_format,
223 execution_id
224 );
225 }
226
227 format!(
230 "๐ **Data Extraction from: {}**\n\n## Content (TODO: Add Filtering)\n{}\n\n*Data Type: {} | Format: {} โข Execution: {}*",
231 url,
232 content,
233 data_type,
234 extraction_format,
235 execution_id
236 )
237 }
238
239 fn generate_execution_id(&self) -> String {
240 format!("scrape_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"))
241 }
242
243 async fn check_cache_first(
245 &self,
246 url: &str,
247 session_id: &str,
248 ) -> Result<Option<Value>, BrightDataError> {
249 let cache_service = get_scrape_cache().await?;
250 cache_service.get_cached_scrape_data(session_id, url).await
251 }
252
253 async fn store_in_cache(
255 &self,
256 url: &str,
257 session_id: &str,
258 data: &Value,
259 ) -> Result<(), BrightDataError> {
260 let cache_service = get_scrape_cache().await?;
261 cache_service.cache_scrape_data(session_id, url, data.clone()).await
262 }
263
264 pub async fn get_session_cached_urls(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
266 let cache_service = get_scrape_cache().await?;
267 cache_service.get_session_scrape_urls(session_id).await
268 }
269
270 pub async fn get_cached_urls_by_domain(
272 &self,
273 session_id: &str,
274 domain: &str,
275 ) -> Result<Vec<String>, BrightDataError> {
276 let cache_service = get_scrape_cache().await?;
277 cache_service.get_cached_urls_by_domain(session_id, domain).await
278 }
279
280 pub async fn clear_url_cache(
282 &self,
283 url: &str,
284 session_id: &str,
285 ) -> Result<(), BrightDataError> {
286 let cache_service = get_scrape_cache().await?;
287 cache_service.clear_scrape_url_cache(session_id, url).await
288 }
289
290 pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
292 let cache_service = get_scrape_cache().await?;
293 cache_service.clear_session_scrape_cache(session_id).await
294 }
295
296 pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
298 let cache_service = get_scrape_cache().await?;
299 cache_service.get_scrape_cache_stats().await
300 }
301
302 pub async fn get_cache_summary(&self, session_id: &str) -> Result<Value, BrightDataError> {
304 let cache_service = get_scrape_cache().await?;
305 cache_service.get_cache_summary(session_id).await
306 }
307
308 pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
310 let mut results = Vec::new();
311
312 info!("๐งช Testing Redis Cache...");
314 match get_scrape_cache().await {
315 Ok(cache_service) => {
316 match cache_service.health_check().await {
317 Ok(_) => results.push("โ
Redis Cache: SUCCESS".to_string()),
318 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
319 }
320 }
321 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
322 }
323
324 let api_test = self.test_connectivity().await?;
326 results.push(api_test);
327
328 Ok(format!("๐ Enhanced Connectivity Test Results:\n{}", results.join("\n")))
329 }
330
331 async fn scrape_with_brightdata(
333 &self,
334 url: &str,
335 data_type: &str,
336 extraction_format: &str,
337 clean_content: bool,
338 schema: Option<Value>,
339 execution_id: &str,
340 ) -> Result<Value, BrightDataError> {
341 let max_retries = env::var("MAX_RETRIES")
342 .ok()
343 .and_then(|s| s.parse::<u32>().ok())
344 .unwrap_or(1);
345
346 let mut last_error = None;
347
348 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
350 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
351 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
352 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
353 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
354 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
355 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
356 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
357
358 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
359
360 for retry_attempt in 0..max_retries {
361 let start_time = std::time::Instant::now();
362 let attempt_id = format!("{}_proxy_r{}", execution_id, retry_attempt);
363
364 info!("๐ Proxy Scrape: Fetching from {} via proxy (execution: {}, retry: {}/{})",
365 url, attempt_id, retry_attempt + 1, max_retries);
366
367 if retry_attempt == 0 {
368 info!("๐ค Proxy Scrape Request:");
369 info!(" Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
370 info!(" Target: {}", url);
371 info!(" Data Type: {}", data_type);
372 info!(" Extraction Format: {}", extraction_format);
373 }
374
375 let proxy = reqwest::Proxy::all(&proxy_url)
377 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
378
379 let client = Client::builder()
380 .proxy(proxy)
381 .timeout(Duration::from_secs(120))
382 .danger_accept_invalid_certs(true) .build()
384 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
385
386 let response = client
387 .get(url)
388 .header("x-unblock-data-format", "markdown")
389 .send()
390 .await
391 .map_err(|e| BrightDataError::ToolError(format!("Proxy scrape request failed to {}: {}", url, e)))?;
392
393 let duration = start_time.elapsed();
394 let status = response.status().as_u16();
395 let response_headers: HashMap<String, String> = response
396 .headers()
397 .iter()
398 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
399 .collect();
400
401 info!("๐ฅ Proxy Scrape Response (retry {}):", retry_attempt + 1);
402 info!(" Status: {}", status);
403 info!(" Duration: {}ms", duration.as_millis());
404
405 let response_text = response.text().await
406 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy scrape response body from {}: {}", url, e)))?;
407
408 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
410 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
411 warn!("โณ Proxy Scrape: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
412 tokio::time::sleep(wait_time).await;
413 last_error = Some(BrightDataError::ToolError(format!("Proxy scrape server error: {}", status)));
414 continue;
415 }
416
417 if !(200..300).contains(&status) {
418 let error_msg = format!("Proxy Scrape: {} returned HTTP {}: {}", url, status,
419 &response_text[..response_text.len().min(200)]);
420
421 warn!("Proxy scrape HTTP error: {}", error_msg);
422 last_error = Some(BrightDataError::ToolError(error_msg));
423
424 if retry_attempt == max_retries - 1 {
425 return Err(last_error.unwrap());
426 }
427 continue;
428 }
429
430 let raw_content = response_text;
432
433 println!("################################################################################################################");
435 println!("BRIGHTDATA PROXY RAW RESPONSE FROM: {}", url);
436 println!("PROXY: {}:{}", proxy_host, proxy_port);
437 println!("EXECUTION: {}", execution_id);
438 println!("DATA TYPE: {}", data_type);
439 println!("EXTRACTION FORMAT: {}", extraction_format);
440 println!("CONTENT LENGTH: {} bytes", raw_content.len());
441 println!("################################################################################################################");
442 println!("{}", raw_content);
443 println!("################################################################################################################");
444 println!("END OF BRIGHTDATA PROXY RESPONSE");
445 println!("################################################################################################################");
446
447 if self.is_data_reduction_enabled() {
449 if ResponseFilter::is_error_page(&raw_content) {
450 return Err(BrightDataError::ToolError("Extraction returned error page".into()));
451 } else if ResponseStrategy::should_try_next_source(&raw_content) {
452 return Err(BrightDataError::ToolError("Content quality too low".into()));
453 }
454 }
455
456 println!("--------------------------------------------------------------------------");
458 println!("SENDING TO ANTHROPIC FROM SCRAPE TOOL (PROXY):");
459 println!("URL: {}", url);
460 println!("DATA TYPE: {}", data_type);
461 println!("EXTRACTION FORMAT: {}", extraction_format);
462 println!("DATA REDUCTION ENABLED: {}", self.is_data_reduction_enabled());
463 println!("CONTENT LENGTH: {} bytes", raw_content.len());
464 println!("--------------------------------------------------------------------------");
465 println!("{}", raw_content);
466 println!("--------------------------------------------------------------------------");
467 println!("END OF CONTENT SENT TO ANTHROPIC");
468 println!("--------------------------------------------------------------------------");
469
470 return Ok(json!({
472 "content": raw_content,
473 "metadata": {
474 "url": url,
475 "proxy_host": proxy_host,
476 "proxy_port": proxy_port,
477 "execution_id": execution_id,
478 "data_type": data_type,
479 "extraction_format": extraction_format,
480 "clean_content": clean_content,
481 "data_format": "markdown",
482 "data_reduction_enabled": self.is_data_reduction_enabled(),
483 "status_code": status,
484 "content_size_bytes": raw_content.len(),
485 "duration_ms": duration.as_millis(),
486 "timestamp": chrono::Utc::now().to_rfc3339(),
487 "retry_attempts": retry_attempt + 1,
488 "max_retries": max_retries,
489 "method": "BrightData Proxy"
490 },
491 "success": true
492 }));
493 }
494
495 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy Scrape: All retry attempts failed".into())))
496 }
497
498 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
500 let test_url = "https://httpbin.org/json";
501 let mut results = Vec::new();
502
503 info!("๐งช Testing BrightData Web Unlocker...");
505 match self.scrape_with_brightdata(
506 test_url, "auto", "structured", true, None, "connectivity_test"
507 ).await {
508 Ok(_) => {
509 results.push("โ
BrightData Web Unlocker: SUCCESS".to_string());
510 }
511 Err(e) => {
512 results.push(format!("โ BrightData Web Unlocker: FAILED - {}", e));
513 }
514 }
515
516 Ok(format!("๐ Connectivity Test Results:\n{}", results.join("\n")))
517 }
518
519 pub async fn is_url_cached(&self, session_id: &str, url: &str) -> Result<bool, BrightDataError> {
521 let cache_service = get_scrape_cache().await?;
522 cache_service.is_url_cached(session_id, url).await
523 }
524
525 pub async fn batch_cache_urls(
527 &self,
528 session_id: &str,
529 url_data: Vec<(String, Value)>, ) -> Result<Vec<String>, BrightDataError> {
531 let cache_service = get_scrape_cache().await?;
532 cache_service.batch_cache_scrape_data(session_id, url_data).await
533 }
534}