snm_brightdata_client/tools/
scrape.rs

1// src/tools/scrape.rs - ENHANCED VERSION WITH REDIS CACHE SUPPORT
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::extras::logger::JSON_LOGGER;
5use crate::filters::{ResponseFilter, ResponseStrategy};
6use crate::services::cache::scrape_cache::get_scrape_cache;
7use async_trait::async_trait;
8use reqwest::Client;
9use serde_json::{json, Value};
10use std::env;
11use std::time::Duration;
12use std::collections::HashMap;
13use log::{info, warn, error};
14
15pub struct Scraper;
16
17#[async_trait]
18impl Tool for Scraper {
19    fn name(&self) -> &str {
20        "scrape_website"
21    }
22
23    fn description(&self) -> &str {
24        "Scrape a webpage using BrightData with intelligent caching and priority-based processing. Supports Web Unlocker with Redis cache for improved performance."
25    }
26
27    fn input_schema(&self) -> Value {
28        json!({
29            "type": "object",
30            "properties": {
31                "url": {
32                    "type": "string",
33                    "description": "The URL to scrape"
34                },
35                "session_id": {
36                    "type": "string",
37                    "description": "Session ID for caching and conversation context tracking"
38                },
39                "data_type": {
40                    "type": "string",
41                    "enum": ["auto", "article", "product", "news", "contact", "general"],
42                    "default": "auto",
43                    "description": "Type of content to focus on during extraction"
44                },
45                "extraction_format": {
46                    "type": "string",
47                    "enum": ["structured", "markdown", "text", "json"],
48                    "default": "structured",
49                    "description": "Format for extracted content"
50                },
51                "clean_content": {
52                    "type": "boolean",
53                    "default": true,
54                    "description": "Remove noise and focus on main content"
55                },
56                "schema": {
57                    "type": "object",
58                    "description": "Optional extraction schema for structured data"
59                },
60                "force_refresh": {
61                    "type": "boolean",
62                    "default": false,
63                    "description": "Force fresh scraping, bypassing cache"
64                }
65            },
66            "required": ["url"]
67        })
68    }
69
70    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71        self.execute_internal(parameters).await
72    }
73
74    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75        let url = parameters
76            .get("url")
77            .and_then(|v| v.as_str())
78            .ok_or_else(|| BrightDataError::ToolError("Missing 'url' parameter".into()))?;
79
80        let session_id = parameters
81            .get("user_id")
82            .and_then(|v| v.as_str())
83            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85        let data_type = parameters
86            .get("data_type")
87            .and_then(|v| v.as_str())
88            .unwrap_or("auto");
89
90        let extraction_format = parameters
91            .get("extraction_format")
92            .and_then(|v| v.as_str())
93            .unwrap_or("structured");
94
95        let clean_content = parameters
96            .get("clean_content")
97            .and_then(|v| v.as_bool())
98            .unwrap_or(true);
99
100        let force_refresh = parameters
101            .get("force_refresh")
102            .and_then(|v| v.as_bool())
103            .unwrap_or(false);
104
105        let schema = parameters.get("schema").cloned();
106
107        let execution_id = self.generate_execution_id();
108        
109        info!("๐ŸŒ Scraping request: '{}' (session: {}, type: {}, format: {})", 
110              url, session_id, data_type, extraction_format);
111        
112        // ๐ŸŽฏ CACHE CHECK - Check Redis cache first (unless force_refresh=true)
113        if !force_refresh {
114            match self.check_cache_first(url, session_id).await {
115                Ok(Some(cached_result)) => {
116                    info!("๐Ÿš€ Cache HIT: Returning cached data for {} in session {}", url, session_id);
117                    
118                    // Create tool result from cached data
119                    let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
120                    let source_used = "Cache";
121                    let method_used = "Redis Cache";
122                    
123                    let formatted_response = self.create_formatted_scrape_response(
124                        url, data_type, extraction_format, content, &execution_id
125                    );
126                    
127                    let tool_result = ToolResult::success_with_raw(
128                        vec![McpContent::text(formatted_response)], 
129                        cached_result
130                    );
131                    
132                    // Apply filtering only if DEDUCT_DATA=true
133                    if self.is_data_reduction_enabled() {
134                        return Ok(ResponseStrategy::apply_size_limits(tool_result));
135                    } else {
136                        return Ok(tool_result);
137                    }
138                }
139                Ok(None) => {
140                    info!("๐Ÿ’พ Cache MISS: Fetching fresh data for {} in session {}", url, session_id);
141                }
142                Err(e) => {
143                    warn!("๐Ÿšจ Cache error (continuing with fresh fetch): {}", e);
144                }
145            }
146        } else {
147            info!("๐Ÿ”„ Force refresh requested, bypassing cache for {}", url);
148        }
149
150        // ๐ŸŒ FRESH FETCH - Cache miss or force refresh, fetch from BrightData
151        match self.scrape_with_brightdata(url, data_type, extraction_format, clean_content, schema, &execution_id).await {
152            Ok(result) => {
153                // ๐Ÿ—„๏ธ CACHE STORE - Store successful result in cache
154                if let Err(e) = self.store_in_cache(url, session_id, &result).await {
155                    warn!("Failed to store result in cache: {}", e);
156                }
157                
158                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
159                
160                // Create formatted response based on DEDUCT_DATA setting
161                let formatted_response = self.create_formatted_scrape_response(
162                    url, data_type, extraction_format, content, &execution_id
163                );
164                
165                let tool_result = ToolResult::success_with_raw(
166                    vec![McpContent::text(formatted_response)], 
167                    result
168                );
169                
170                // Apply filtering only if DEDUCT_DATA=true
171                if self.is_data_reduction_enabled() {
172                    Ok(ResponseStrategy::apply_size_limits(tool_result))
173                } else {
174                    Ok(tool_result)
175                }
176            }
177            Err(_e) => {
178                // Return empty data for BrightData errors - Anthropic will retry
179                warn!("BrightData error for URL '{}', returning empty data for retry", url);
180                let empty_response = json!({
181                    "url": url,
182                    "data_type": data_type,
183                    "status": "no_data",
184                    "reason": "brightdata_error",
185                    "execution_id": execution_id,
186                    "session_id": session_id
187                });
188                
189                Ok(ToolResult::success_with_raw(
190                    vec![McpContent::text("๐Ÿ“Š **No Data Available**\n\nPlease try again with a different URL or check if the website is accessible.".to_string())],
191                    empty_response
192                ))
193            }
194        }
195    }
196}
197
198impl Scraper {
199    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
200    fn is_data_reduction_enabled(&self) -> bool {
201        std::env::var("DEDUCT_DATA")
202            .unwrap_or_else(|_| "false".to_string())
203            .to_lowercase() == "true"
204    }
205
206    /// ENHANCED: Create formatted response with DEDUCT_DATA control
207    fn create_formatted_scrape_response(
208        &self,
209        url: &str,
210        data_type: &str,
211        extraction_format: &str,
212        content: &str,
213        execution_id: &str
214    ) -> String {
215        // If DEDUCT_DATA=false, return full content with basic formatting
216        if !self.is_data_reduction_enabled() {
217            return format!(
218                "๐Ÿ“Š **Data Extraction from: {}**\n\n## Full Content\n{}\n\n*Data Type: {} | Format: {} โ€ข Execution: {}*",
219                url, 
220                content,
221                data_type, 
222                extraction_format,
223                execution_id
224            );
225        }
226
227        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
228        // For now, return full content formatted
229        format!(
230            "๐Ÿ“Š **Data Extraction from: {}**\n\n## Content (TODO: Add Filtering)\n{}\n\n*Data Type: {} | Format: {} โ€ข Execution: {}*",
231            url, 
232            content,
233            data_type, 
234            extraction_format,
235            execution_id
236        )
237    }
238
239    fn generate_execution_id(&self) -> String {
240        format!("scrape_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"))
241    }
242
243    // ๐ŸŽฏ ADDED: Check Redis cache first
244    async fn check_cache_first(
245        &self,
246        url: &str,
247        session_id: &str,
248    ) -> Result<Option<Value>, BrightDataError> {
249        let cache_service = get_scrape_cache().await?;
250        cache_service.get_cached_scrape_data(session_id, url).await
251    }
252
253    // ๐Ÿ—„๏ธ ADDED: Store successful result in Redis cache
254    async fn store_in_cache(
255        &self,
256        url: &str,
257        session_id: &str,
258        data: &Value,
259    ) -> Result<(), BrightDataError> {
260        let cache_service = get_scrape_cache().await?;
261        cache_service.cache_scrape_data(session_id, url, data.clone()).await
262    }
263
264    // ๐Ÿ” ADDED: Get all cached URLs for session (useful for finding related content)
265    pub async fn get_session_cached_urls(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
266        let cache_service = get_scrape_cache().await?;
267        cache_service.get_session_scrape_urls(session_id).await
268    }
269
270    // ๐Ÿ” ADDED: Get cached URLs by domain
271    pub async fn get_cached_urls_by_domain(
272        &self,
273        session_id: &str,
274        domain: &str,
275    ) -> Result<Vec<String>, BrightDataError> {
276        let cache_service = get_scrape_cache().await?;
277        cache_service.get_cached_urls_by_domain(session_id, domain).await
278    }
279
280    // ๐Ÿ—‘๏ธ ADDED: Clear cache for specific URL
281    pub async fn clear_url_cache(
282        &self,
283        url: &str,
284        session_id: &str,
285    ) -> Result<(), BrightDataError> {
286        let cache_service = get_scrape_cache().await?;
287        cache_service.clear_scrape_url_cache(session_id, url).await
288    }
289
290    // ๐Ÿ—‘๏ธ ADDED: Clear entire session cache
291    pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
292        let cache_service = get_scrape_cache().await?;
293        cache_service.clear_session_scrape_cache(session_id).await
294    }
295
296    // ๐Ÿ“Š ADDED: Get cache statistics
297    pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
298        let cache_service = get_scrape_cache().await?;
299        cache_service.get_scrape_cache_stats().await
300    }
301
302    // ๐Ÿ“Š ADDED: Get cache summary for session
303    pub async fn get_cache_summary(&self, session_id: &str) -> Result<Value, BrightDataError> {
304        let cache_service = get_scrape_cache().await?;
305        cache_service.get_cache_summary(session_id).await
306    }
307
308    // ๐Ÿฅ ADDED: Enhanced connectivity test including cache
309    pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
310        let mut results = Vec::new();
311        
312        // Test cache connectivity
313        info!("๐Ÿงช Testing Redis Cache...");
314        match get_scrape_cache().await {
315            Ok(cache_service) => {
316                match cache_service.health_check().await {
317                    Ok(_) => results.push("โœ… Redis Cache: SUCCESS".to_string()),
318                    Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
319                }
320            }
321            Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
322        }
323        
324        // Test existing connectivity
325        let api_test = self.test_connectivity().await?;
326        results.push(api_test);
327        
328        Ok(format!("๐Ÿ” Enhanced Connectivity Test Results:\n{}", results.join("\n")))
329    }
330
331    /// ENHANCED: Extract data with BrightData using proxy method (similar to forex.rs)
332    async fn scrape_with_brightdata(
333        &self,
334        url: &str,
335        data_type: &str,
336        extraction_format: &str,
337        clean_content: bool,
338        schema: Option<Value>,
339        execution_id: &str,
340    ) -> Result<Value, BrightDataError> {
341        let max_retries = env::var("MAX_RETRIES")
342            .ok()
343            .and_then(|s| s.parse::<u32>().ok())
344            .unwrap_or(1);
345        
346        let mut last_error = None;
347        
348        // Get proxy configuration from environment
349        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
350            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
351        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
352            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
353        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
354            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
355        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
356            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
357
358        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
359        
360        for retry_attempt in 0..max_retries {
361            let start_time = std::time::Instant::now();
362            let attempt_id = format!("{}_proxy_r{}", execution_id, retry_attempt);
363            
364            info!("๐Ÿ”„ Proxy Scrape: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
365                  url, attempt_id, retry_attempt + 1, max_retries);
366            
367            if retry_attempt == 0 {
368                info!("๐Ÿ“ค Proxy Scrape Request:");
369                info!("   Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
370                info!("   Target: {}", url);
371                info!("   Data Type: {}", data_type);
372                info!("   Extraction Format: {}", extraction_format);
373            }
374
375            // Create client with proxy configuration
376            let proxy = reqwest::Proxy::all(&proxy_url)
377                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
378
379            let client = Client::builder()
380                .proxy(proxy)
381                .timeout(Duration::from_secs(120))
382                .danger_accept_invalid_certs(true) // Often needed for proxy connections
383                .build()
384                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
385
386            let response = client
387                .get(url)
388                .header("x-unblock-data-format", "markdown")
389                .send()
390                .await
391                .map_err(|e| BrightDataError::ToolError(format!("Proxy scrape request failed to {}: {}", url, e)))?;
392
393            let duration = start_time.elapsed();
394            let status = response.status().as_u16();
395            let response_headers: HashMap<String, String> = response
396                .headers()
397                .iter()
398                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
399                .collect();
400
401            info!("๐Ÿ“ฅ Proxy Scrape Response (retry {}):", retry_attempt + 1);
402            info!("   Status: {}", status);
403            info!("   Duration: {}ms", duration.as_millis());
404
405            let response_text = response.text().await
406                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy scrape response body from {}: {}", url, e)))?;
407
408            // Handle server errors with retry
409            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
410                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
411                warn!("โณ Proxy Scrape: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
412                tokio::time::sleep(wait_time).await;
413                last_error = Some(BrightDataError::ToolError(format!("Proxy scrape server error: {}", status)));
414                continue;
415            }
416            
417            if !(200..300).contains(&status) {
418                let error_msg = format!("Proxy Scrape: {} returned HTTP {}: {}", url, status, 
419                                      &response_text[..response_text.len().min(200)]);
420                
421                warn!("Proxy scrape HTTP error: {}", error_msg);
422                last_error = Some(BrightDataError::ToolError(error_msg));
423                
424                if retry_attempt == max_retries - 1 {
425                    return Err(last_error.unwrap());
426                }
427                continue;
428            }
429
430            // SUCCESS - Process response
431            let raw_content = response_text;
432
433            // Print what came from BrightData Proxy
434            println!("################################################################################################################");
435            println!("BRIGHTDATA PROXY RAW RESPONSE FROM: {}", url);
436            println!("PROXY: {}:{}", proxy_host, proxy_port);
437            println!("EXECUTION: {}", execution_id);
438            println!("DATA TYPE: {}", data_type);
439            println!("EXTRACTION FORMAT: {}", extraction_format);
440            println!("CONTENT LENGTH: {} bytes", raw_content.len());
441            println!("################################################################################################################");
442            println!("{}", raw_content);
443            println!("################################################################################################################");
444            println!("END OF BRIGHTDATA PROXY RESPONSE");
445            println!("################################################################################################################");
446
447            // Apply filters only if DEDUCT_DATA=true
448            if self.is_data_reduction_enabled() {
449                if ResponseFilter::is_error_page(&raw_content) {
450                    return Err(BrightDataError::ToolError("Extraction returned error page".into()));
451                } else if ResponseStrategy::should_try_next_source(&raw_content) {
452                    return Err(BrightDataError::ToolError("Content quality too low".into()));
453                }
454            }
455
456            // Print what will be sent to Anthropic
457            println!("--------------------------------------------------------------------------");
458            println!("SENDING TO ANTHROPIC FROM SCRAPE TOOL (PROXY):");
459            println!("URL: {}", url);
460            println!("DATA TYPE: {}", data_type);
461            println!("EXTRACTION FORMAT: {}", extraction_format);
462            println!("DATA REDUCTION ENABLED: {}", self.is_data_reduction_enabled());
463            println!("CONTENT LENGTH: {} bytes", raw_content.len());
464            println!("--------------------------------------------------------------------------");
465            println!("{}", raw_content);
466            println!("--------------------------------------------------------------------------");
467            println!("END OF CONTENT SENT TO ANTHROPIC");
468            println!("--------------------------------------------------------------------------");
469
470            // Return enhanced result with additional metadata
471            return Ok(json!({
472                "content": raw_content,
473                "metadata": {
474                    "url": url,
475                    "proxy_host": proxy_host,
476                    "proxy_port": proxy_port,
477                    "execution_id": execution_id,
478                    "data_type": data_type,
479                    "extraction_format": extraction_format,
480                    "clean_content": clean_content,
481                    "data_format": "markdown",
482                    "data_reduction_enabled": self.is_data_reduction_enabled(),
483                    "status_code": status,
484                    "content_size_bytes": raw_content.len(),
485                    "duration_ms": duration.as_millis(),
486                    "timestamp": chrono::Utc::now().to_rfc3339(),
487                    "retry_attempts": retry_attempt + 1,
488                    "max_retries": max_retries,
489                    "method": "BrightData Proxy"
490                },
491                "success": true
492            }));
493        }
494
495        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy Scrape: All retry attempts failed".into())))
496    }
497
498    /// Test BrightData connectivity
499    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
500        let test_url = "https://httpbin.org/json";
501        let mut results = Vec::new();
502        
503        // Test BrightData API
504        info!("๐Ÿงช Testing BrightData Web Unlocker...");
505        match self.scrape_with_brightdata(
506            test_url, "auto", "structured", true, None, "connectivity_test"
507        ).await {
508            Ok(_) => {
509                results.push("โœ… BrightData Web Unlocker: SUCCESS".to_string());
510            }
511            Err(e) => {
512                results.push(format!("โŒ BrightData Web Unlocker: FAILED - {}", e));
513            }
514        }
515        
516        Ok(format!("๐Ÿ” Connectivity Test Results:\n{}", results.join("\n")))
517    }
518
519    /// Check if URL is cached
520    pub async fn is_url_cached(&self, session_id: &str, url: &str) -> Result<bool, BrightDataError> {
521        let cache_service = get_scrape_cache().await?;
522        cache_service.is_url_cached(session_id, url).await
523    }
524
525    /// Batch cache multiple URLs (useful for bulk operations)
526    pub async fn batch_cache_urls(
527        &self,
528        session_id: &str,
529        url_data: Vec<(String, Value)>, // Vec<(url, data)>
530    ) -> Result<Vec<String>, BrightDataError> {
531        let cache_service = get_scrape_cache().await?;
532        cache_service.batch_cache_scrape_data(session_id, url_data).await
533    }
534}