snm_brightdata_client/services/cache/
scrape_cache.rs

1// crates/snm-brightdata-client/src/services/scrape_cache.rs
2use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8use url::Url;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11
12#[derive(Debug, Clone)]
13pub struct ScrapeCache {
14    client: Client,
15    ttl: u64,
16}
17
18impl ScrapeCache {
19    /// Initialize Redis scraping cache service
20    pub async fn new() -> Result<Self, BrightDataError> {
21        let redis_url = env::var("REDIS_URL")
22            .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
23        
24        let client = Client::open(redis_url.as_str())
25            .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
26        
27        // Test connection with the correct async method
28        let mut conn = client.get_multiplexed_async_connection().await
29            .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
30        
31        // Test ping
32        let _: String = conn.ping().await
33            .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
34        
35        let ttl = env::var("SCRAPE_CACHE_TTL_SECONDS")
36            .or_else(|_| env::var("CACHE_TTL_SECONDS"))
37            .ok()
38            .and_then(|s| s.parse::<u64>().ok())
39            .unwrap_or(7200); // Default 2 hours (web content changes more frequently than stock data)
40        
41        info!("✅ Scraping cache service initialized (TTL: {}s)", ttl);
42        
43        Ok(Self { client, ttl })
44    }
45    
46    /// Generate scraping cache key: session_id:URL_HASH
47    fn generate_key(&self, session_id: &str, url: &str) -> String {
48        // Create a hash of the URL to ensure consistent key length and handle special characters
49        let mut hasher = DefaultHasher::new();
50        url.hash(&mut hasher);
51        let url_hash = hasher.finish();
52        
53        // Also normalize the URL for better cache hits
54        let normalized_url = self.normalize_url(url);
55        let mut url_hasher = DefaultHasher::new();
56        normalized_url.hash(&mut url_hasher);
57        let normalized_hash = url_hasher.finish();
58        
59        let key = format!("scrape:{}:{:x}", session_id, normalized_hash);
60        debug!("Scraping Cache Key: {} (URL: {})", key, url);
61        key
62    }
63    
64    /// Normalize URL for better cache hits (remove unnecessary parameters, fragments, etc.)
65    fn normalize_url(&self, url: &str) -> String {
66        match Url::parse(url) {
67            Ok(mut parsed_url) => {
68                // Remove fragment (everything after #)
69                parsed_url.set_fragment(None);
70                
71                // Remove common tracking parameters
72                let tracking_params = vec![
73                    "utm_source", "utm_medium", "utm_campaign", "utm_content", "utm_term",
74                    "gclid", "fbclid", "ref", "source", "_ga", "mc_cid", "mc_eid"
75                ];
76                
77                // Collect query pairs into Vec<(String, String)>
78                let pairs: Vec<(String, String)> = parsed_url.query_pairs().into_owned().collect();
79                
80                let filtered_pairs: Vec<(String, String)> = pairs
81                    .into_iter()
82                    .filter(|(key, _)| !tracking_params.contains(&key.as_str()))
83                    .collect();
84                
85                if filtered_pairs.is_empty() {
86                    parsed_url.set_query(None);
87                } else {
88                    let query_string = filtered_pairs
89                        .iter()
90                        .map(|(k, v)| format!("{}={}", k, v))
91                        .collect::<Vec<_>>()
92                        .join("&");
93                    parsed_url.set_query(Some(&query_string));
94                }
95                
96                parsed_url.to_string()
97            }
98            Err(_) => url.to_string(), // If URL parsing fails, use original
99        }
100    }
101
102    /// Check if cached scraping data exists and is valid
103    pub async fn get_cached_scrape_data(
104        &self, 
105        session_id: &str,
106        url: &str
107    ) -> Result<Option<Value>, BrightDataError> {
108        let key = self.generate_key(session_id, url);
109        
110        let mut conn = self.client.get_multiplexed_async_connection().await
111            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
112        
113        match conn.get::<_, Option<String>>(&key).await {
114            Ok(Some(cached_json)) => {
115                match serde_json::from_str::<Value>(&cached_json) {
116                    Ok(cached_data) => {
117                        // Check for scraping-specific cached timestamp
118                        if let Some(timestamp) = cached_data.get("scrape_cached_at").and_then(|t| t.as_u64()) {
119                            let current_time = SystemTime::now()
120                                .duration_since(UNIX_EPOCH)
121                                .unwrap()
122                                .as_secs();
123                            
124                            if current_time - timestamp < self.ttl {
125                                info!("🎯 Scraping Cache HIT for {} in session {}", url, session_id);
126                                return Ok(Some(cached_data));
127                            } else {
128                                info!("⏰ Scraping Cache EXPIRED for {} in session {}", url, session_id);
129                                // Remove expired data
130                                let _: RedisResult<()> = conn.del(&key).await;
131                            }
132                        } else {
133                            // DEBUG: Show what fields are available
134                            let available_fields: Vec<String> = cached_data.as_object()
135                                .map(|obj| obj.keys().map(|k| k.to_string()).collect())
136                                .unwrap_or_default();
137                            warn!("❌ Missing 'scrape_cached_at' field for key: {} (available: {:?})", key, available_fields);
138                            // Remove corrupted data
139                            let _: RedisResult<()> = conn.del(&key).await;
140                        }
141                    }
142                    Err(e) => {
143                        warn!("Failed to parse cached scraping data for {}: {}", key, e);
144                        // Remove corrupted data
145                        let _: RedisResult<()> = conn.del(&key).await;
146                    }
147                }
148            }
149            Ok(None) => {
150                debug!("💾 Scraping Cache MISS for {} in session {} (key not found)", url, session_id);
151            }
152            Err(e) => {
153                error!("Redis get error for scraping {}: {}", key, e);
154                return Err(BrightDataError::ToolError(format!("Scraping cache get failed: {}", e)));
155            }
156        }
157        
158        Ok(None)
159    }
160    
161    /// Cache scraping data with metadata
162    pub async fn cache_scrape_data(
163        &self,
164        session_id: &str,
165        url: &str,
166        data: Value,
167    ) -> Result<(), BrightDataError> {
168        let key = self.generate_key(session_id, url);
169        let normalized_url = self.normalize_url(url);
170        
171        // Add scraping caching metadata
172        let mut cached_data = data;
173        cached_data["scrape_cached_at"] = json!(SystemTime::now()
174            .duration_since(UNIX_EPOCH)
175            .unwrap()
176            .as_secs());
177        cached_data["scrape_cache_key"] = json!(key.clone());
178        cached_data["scrape_cache_ttl"] = json!(self.ttl);
179        cached_data["original_url"] = json!(url);
180        cached_data["normalized_url"] = json!(normalized_url);
181        cached_data["session_id"] = json!(session_id);
182        
183        let json_string = serde_json::to_string(&cached_data)
184            .map_err(|e| BrightDataError::ToolError(format!("Scraping JSON serialization failed: {}", e)))?;
185        
186        let mut conn = self.client.get_multiplexed_async_connection().await
187            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
188        
189        // Set with TTL
190        conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
191            .map_err(|e| BrightDataError::ToolError(format!("Scraping cache set failed: {}", e)))?;
192        
193        info!("💾 Cached scraping data for {} in session {} (TTL: {}s)", url, session_id, self.ttl);
194        
195        Ok(())
196    }
197    
198    /// Get all cached URLs for a session
199    pub async fn get_session_scrape_urls(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
200        let mut conn = self.client.get_multiplexed_async_connection().await
201            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
202        
203        // Get all keys for this session's scraped data
204        let pattern = format!("scrape:{}:*", session_id);
205        let keys: Vec<String> = conn.keys(&pattern).await
206            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
207        
208        let mut urls = Vec::new();
209        
210        // Get the original URLs from the cached data
211        for key in keys {
212            match conn.get::<_, Option<String>>(&key).await {
213                Ok(Some(cached_json)) => {
214                    if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
215                        if let Some(original_url) = cached_data.get("original_url").and_then(|u| u.as_str()) {
216                            urls.push(original_url.to_string());
217                        }
218                    }
219                }
220                _ => continue,
221            }
222        }
223        
224        debug!("📋 Found {} cached URLs for session {}: {:?}", urls.len(), session_id, urls);
225        
226        Ok(urls)
227    }
228    
229    /// Clear cache for specific URL in session
230    pub async fn clear_scrape_url_cache(
231        &self,
232        session_id: &str,
233        url: &str,
234    ) -> Result<(), BrightDataError> {
235        let key = self.generate_key(session_id, url);
236        
237        let mut conn = self.client.get_multiplexed_async_connection().await
238            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
239        
240        let _: RedisResult<()> = conn.del(&key).await;
241        
242        info!("🗑️ Cleared scraping cache for {} in session {}", url, session_id);
243        
244        Ok(())
245    }
246    
247    /// Clear all scraping cache for entire session
248    pub async fn clear_session_scrape_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
249        let mut conn = self.client.get_multiplexed_async_connection().await
250            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
251        
252        // Get all scraping keys for this session
253        let pattern = format!("scrape:{}:*", session_id);
254        let keys: Vec<String> = conn.keys(&pattern).await
255            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
256        
257        if keys.is_empty() {
258            return Ok(0);
259        }
260        
261        let deleted_count: u32 = conn.del(&keys).await
262            .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
263        
264        info!("🗑️ Cleared {} cached scraping items for session {}", deleted_count, session_id);
265        
266        Ok(deleted_count)
267    }
268    
269    /// Get scraping cache statistics
270    pub async fn get_scrape_cache_stats(&self) -> Result<Value, BrightDataError> {
271        let mut conn = self.client.get_multiplexed_async_connection().await
272            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
273        
274        // Get scraping cache key count
275        let scrape_keys: Vec<String> = conn.keys("scrape:*").await.unwrap_or_default();
276        
277        // Group by session
278        let mut session_counts = std::collections::HashMap::new();
279        let mut domain_counts = std::collections::HashMap::new();
280        
281        for key in &scrape_keys {
282            let parts: Vec<&str> = key.split(':').collect();
283            if parts.len() >= 2 {
284                let session_id = parts[1];
285                *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
286                
287                // Try to get domain information from cached data
288                if let Ok(Some(cached_json)) = conn.get::<_, Option<String>>(key).await {
289                    if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
290                        if let Some(url) = cached_data.get("original_url").and_then(|u| u.as_str()) {
291                            if let Ok(parsed_url) = Url::parse(url) {
292                                if let Some(domain) = parsed_url.domain() {
293                                    *domain_counts.entry(domain.to_string()).or_insert(0) += 1;
294                                }
295                            }
296                        }
297                    }
298                }
299            }
300        }
301        
302        let stats = json!({
303            "total_scrape_cache_entries": scrape_keys.len(),
304            "active_sessions": session_counts.len(),
305            "session_breakdown": session_counts,
306            "domain_breakdown": domain_counts,
307            "scrape_cache_ttl_seconds": self.ttl,
308            "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
309        });
310        
311        Ok(stats)
312    }
313    
314    /// Check if specific URL is cached for session
315    pub async fn is_url_cached(&self, session_id: &str, url: &str) -> Result<bool, BrightDataError> {
316        let key = self.generate_key(session_id, url);
317        
318        let mut conn = self.client.get_multiplexed_async_connection().await
319            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
320        
321        let exists: bool = conn.exists(&key).await
322            .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
323        
324        Ok(exists)
325    }
326    
327    /// Health check for Redis connection
328    pub async fn health_check(&self) -> Result<bool, BrightDataError> {
329        let mut conn = self.client.get_multiplexed_async_connection().await
330            .map_err(|_| BrightDataError::ToolError("Scraping cache Redis connection failed".into()))?;
331        
332        let _: String = conn.ping().await
333            .map_err(|_| BrightDataError::ToolError("Scraping cache Redis ping failed".into()))?;
334        
335        Ok(true)
336    }
337    
338    /// Batch cache multiple URLs (useful for bulk scraping operations)
339    pub async fn batch_cache_scrape_data(
340        &self,
341        session_id: &str,
342        scrape_data: Vec<(String, Value)>, // Vec<(url, data)>
343    ) -> Result<Vec<String>, BrightDataError> {
344        let mut successful_urls = Vec::new();
345        
346        for (url, data) in scrape_data {
347            match self.cache_scrape_data(session_id, &url, data).await {
348                Ok(_) => {
349                    successful_urls.push(url);
350                }
351                Err(e) => {
352                    warn!("Failed to cache scraping data for {}: {}", url, e);
353                }
354            }
355        }
356        
357        info!("📦 Batch cached {} URLs for session {}", successful_urls.len(), session_id);
358        
359        Ok(successful_urls)
360    }
361    
362    /// Get cache entry by domain (useful for finding related cached content)
363    pub async fn get_cached_urls_by_domain(
364        &self,
365        session_id: &str,
366        domain: &str,
367    ) -> Result<Vec<String>, BrightDataError> {
368        let urls = self.get_session_scrape_urls(session_id).await?;
369        
370        let matching_urls: Vec<String> = urls
371            .into_iter()
372            .filter(|url| {
373                if let Ok(parsed_url) = Url::parse(url) {
374                    if let Some(url_domain) = parsed_url.domain() {
375                        return url_domain.contains(domain) || domain.contains(url_domain);
376                    }
377                }
378                false
379            })
380            .collect();
381        
382        debug!("🔍 Found {} cached URLs matching domain '{}' for session {}", 
383               matching_urls.len(), domain, session_id);
384        
385        Ok(matching_urls)
386    }
387    
388    /// Get cached data with content length information
389    pub async fn get_cache_summary(&self, session_id: &str) -> Result<Value, BrightDataError> {
390        let mut conn = self.client.get_multiplexed_async_connection().await
391            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
392        
393        let pattern = format!("scrape:{}:*", session_id);
394        let keys: Vec<String> = conn.keys(&pattern).await
395            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
396        
397        let mut summary = Vec::new();
398        let mut total_content_size = 0;
399        
400        for key in keys {
401            if let Ok(Some(cached_json)) = conn.get::<_, Option<String>>(&key).await {
402                if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
403                    let url = cached_data.get("original_url")
404                        .and_then(|u| u.as_str())
405                        .unwrap_or("unknown");
406                    
407                    let content_size = cached_data.get("content")
408                        .and_then(|c| c.as_str())
409                        .map(|s| s.len())
410                        .unwrap_or(0);
411                    
412                    let cached_at = cached_data.get("scrape_cached_at")
413                        .and_then(|t| t.as_u64())
414                        .unwrap_or(0);
415                    
416                    total_content_size += content_size;
417                    
418                    summary.push(json!({
419                        "url": url,
420                        "content_size_bytes": content_size,
421                        "cached_at": cached_at,
422                        "cache_key": key
423                    }));
424                }
425            }
426        }
427        
428        Ok(json!({
429            "session_id": session_id,
430            "total_cached_urls": summary.len(),
431            "total_content_size_bytes": total_content_size,
432            "cached_items": summary,
433            "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
434        }))
435    }
436}
437
438// Singleton instance for global access
439use std::sync::Arc;
440use tokio::sync::OnceCell;
441
442static SCRAPE_CACHE: OnceCell<Arc<ScrapeCache>> = OnceCell::const_new();
443
444/// Get global scraping cache service instance
445pub async fn get_scrape_cache() -> Result<Arc<ScrapeCache>, BrightDataError> {
446    SCRAPE_CACHE.get_or_try_init(|| async {
447        let service = ScrapeCache::new().await?;
448        Ok(Arc::new(service))
449    }).await.map(|arc| arc.clone())
450}
451
452/// Initialize scraping cache service (call this at startup)
453pub async fn init_scrape_cache() -> Result<(), BrightDataError> {
454    let _service = get_scrape_cache().await?;
455    info!("✅ Global scraping cache service initialized");
456    Ok(())
457}