snm_brightdata_client/services/cache/
scrape_cache.rs1use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8use url::Url;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11
12#[derive(Debug, Clone)]
13pub struct ScrapeCache {
14 client: Client,
15 ttl: u64,
16}
17
18impl ScrapeCache {
19 pub async fn new() -> Result<Self, BrightDataError> {
21 let redis_url = env::var("REDIS_URL")
22 .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
23
24 let client = Client::open(redis_url.as_str())
25 .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
26
27 let mut conn = client.get_multiplexed_async_connection().await
29 .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
30
31 let _: String = conn.ping().await
33 .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
34
35 let ttl = env::var("SCRAPE_CACHE_TTL_SECONDS")
36 .or_else(|_| env::var("CACHE_TTL_SECONDS"))
37 .ok()
38 .and_then(|s| s.parse::<u64>().ok())
39 .unwrap_or(7200); info!("✅ Scraping cache service initialized (TTL: {}s)", ttl);
42
43 Ok(Self { client, ttl })
44 }
45
46 fn generate_key(&self, session_id: &str, url: &str) -> String {
48 let mut hasher = DefaultHasher::new();
50 url.hash(&mut hasher);
51 let url_hash = hasher.finish();
52
53 let normalized_url = self.normalize_url(url);
55 let mut url_hasher = DefaultHasher::new();
56 normalized_url.hash(&mut url_hasher);
57 let normalized_hash = url_hasher.finish();
58
59 let key = format!("scrape:{}:{:x}", session_id, normalized_hash);
60 debug!("Scraping Cache Key: {} (URL: {})", key, url);
61 key
62 }
63
64 fn normalize_url(&self, url: &str) -> String {
66 match Url::parse(url) {
67 Ok(mut parsed_url) => {
68 parsed_url.set_fragment(None);
70
71 let tracking_params = vec![
73 "utm_source", "utm_medium", "utm_campaign", "utm_content", "utm_term",
74 "gclid", "fbclid", "ref", "source", "_ga", "mc_cid", "mc_eid"
75 ];
76
77 let pairs: Vec<(String, String)> = parsed_url.query_pairs().into_owned().collect();
79
80 let filtered_pairs: Vec<(String, String)> = pairs
81 .into_iter()
82 .filter(|(key, _)| !tracking_params.contains(&key.as_str()))
83 .collect();
84
85 if filtered_pairs.is_empty() {
86 parsed_url.set_query(None);
87 } else {
88 let query_string = filtered_pairs
89 .iter()
90 .map(|(k, v)| format!("{}={}", k, v))
91 .collect::<Vec<_>>()
92 .join("&");
93 parsed_url.set_query(Some(&query_string));
94 }
95
96 parsed_url.to_string()
97 }
98 Err(_) => url.to_string(), }
100 }
101
102 pub async fn get_cached_scrape_data(
104 &self,
105 session_id: &str,
106 url: &str
107 ) -> Result<Option<Value>, BrightDataError> {
108 let key = self.generate_key(session_id, url);
109
110 let mut conn = self.client.get_multiplexed_async_connection().await
111 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
112
113 match conn.get::<_, Option<String>>(&key).await {
114 Ok(Some(cached_json)) => {
115 match serde_json::from_str::<Value>(&cached_json) {
116 Ok(cached_data) => {
117 if let Some(timestamp) = cached_data.get("scrape_cached_at").and_then(|t| t.as_u64()) {
119 let current_time = SystemTime::now()
120 .duration_since(UNIX_EPOCH)
121 .unwrap()
122 .as_secs();
123
124 if current_time - timestamp < self.ttl {
125 info!("🎯 Scraping Cache HIT for {} in session {}", url, session_id);
126 return Ok(Some(cached_data));
127 } else {
128 info!("⏰ Scraping Cache EXPIRED for {} in session {}", url, session_id);
129 let _: RedisResult<()> = conn.del(&key).await;
131 }
132 } else {
133 let available_fields: Vec<String> = cached_data.as_object()
135 .map(|obj| obj.keys().map(|k| k.to_string()).collect())
136 .unwrap_or_default();
137 warn!("❌ Missing 'scrape_cached_at' field for key: {} (available: {:?})", key, available_fields);
138 let _: RedisResult<()> = conn.del(&key).await;
140 }
141 }
142 Err(e) => {
143 warn!("Failed to parse cached scraping data for {}: {}", key, e);
144 let _: RedisResult<()> = conn.del(&key).await;
146 }
147 }
148 }
149 Ok(None) => {
150 debug!("💾 Scraping Cache MISS for {} in session {} (key not found)", url, session_id);
151 }
152 Err(e) => {
153 error!("Redis get error for scraping {}: {}", key, e);
154 return Err(BrightDataError::ToolError(format!("Scraping cache get failed: {}", e)));
155 }
156 }
157
158 Ok(None)
159 }
160
161 pub async fn cache_scrape_data(
163 &self,
164 session_id: &str,
165 url: &str,
166 data: Value,
167 ) -> Result<(), BrightDataError> {
168 let key = self.generate_key(session_id, url);
169 let normalized_url = self.normalize_url(url);
170
171 let mut cached_data = data;
173 cached_data["scrape_cached_at"] = json!(SystemTime::now()
174 .duration_since(UNIX_EPOCH)
175 .unwrap()
176 .as_secs());
177 cached_data["scrape_cache_key"] = json!(key.clone());
178 cached_data["scrape_cache_ttl"] = json!(self.ttl);
179 cached_data["original_url"] = json!(url);
180 cached_data["normalized_url"] = json!(normalized_url);
181 cached_data["session_id"] = json!(session_id);
182
183 let json_string = serde_json::to_string(&cached_data)
184 .map_err(|e| BrightDataError::ToolError(format!("Scraping JSON serialization failed: {}", e)))?;
185
186 let mut conn = self.client.get_multiplexed_async_connection().await
187 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
188
189 conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
191 .map_err(|e| BrightDataError::ToolError(format!("Scraping cache set failed: {}", e)))?;
192
193 info!("💾 Cached scraping data for {} in session {} (TTL: {}s)", url, session_id, self.ttl);
194
195 Ok(())
196 }
197
198 pub async fn get_session_scrape_urls(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
200 let mut conn = self.client.get_multiplexed_async_connection().await
201 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
202
203 let pattern = format!("scrape:{}:*", session_id);
205 let keys: Vec<String> = conn.keys(&pattern).await
206 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
207
208 let mut urls = Vec::new();
209
210 for key in keys {
212 match conn.get::<_, Option<String>>(&key).await {
213 Ok(Some(cached_json)) => {
214 if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
215 if let Some(original_url) = cached_data.get("original_url").and_then(|u| u.as_str()) {
216 urls.push(original_url.to_string());
217 }
218 }
219 }
220 _ => continue,
221 }
222 }
223
224 debug!("📋 Found {} cached URLs for session {}: {:?}", urls.len(), session_id, urls);
225
226 Ok(urls)
227 }
228
229 pub async fn clear_scrape_url_cache(
231 &self,
232 session_id: &str,
233 url: &str,
234 ) -> Result<(), BrightDataError> {
235 let key = self.generate_key(session_id, url);
236
237 let mut conn = self.client.get_multiplexed_async_connection().await
238 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
239
240 let _: RedisResult<()> = conn.del(&key).await;
241
242 info!("🗑️ Cleared scraping cache for {} in session {}", url, session_id);
243
244 Ok(())
245 }
246
247 pub async fn clear_session_scrape_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
249 let mut conn = self.client.get_multiplexed_async_connection().await
250 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
251
252 let pattern = format!("scrape:{}:*", session_id);
254 let keys: Vec<String> = conn.keys(&pattern).await
255 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
256
257 if keys.is_empty() {
258 return Ok(0);
259 }
260
261 let deleted_count: u32 = conn.del(&keys).await
262 .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
263
264 info!("🗑️ Cleared {} cached scraping items for session {}", deleted_count, session_id);
265
266 Ok(deleted_count)
267 }
268
269 pub async fn get_scrape_cache_stats(&self) -> Result<Value, BrightDataError> {
271 let mut conn = self.client.get_multiplexed_async_connection().await
272 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
273
274 let scrape_keys: Vec<String> = conn.keys("scrape:*").await.unwrap_or_default();
276
277 let mut session_counts = std::collections::HashMap::new();
279 let mut domain_counts = std::collections::HashMap::new();
280
281 for key in &scrape_keys {
282 let parts: Vec<&str> = key.split(':').collect();
283 if parts.len() >= 2 {
284 let session_id = parts[1];
285 *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
286
287 if let Ok(Some(cached_json)) = conn.get::<_, Option<String>>(key).await {
289 if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
290 if let Some(url) = cached_data.get("original_url").and_then(|u| u.as_str()) {
291 if let Ok(parsed_url) = Url::parse(url) {
292 if let Some(domain) = parsed_url.domain() {
293 *domain_counts.entry(domain.to_string()).or_insert(0) += 1;
294 }
295 }
296 }
297 }
298 }
299 }
300 }
301
302 let stats = json!({
303 "total_scrape_cache_entries": scrape_keys.len(),
304 "active_sessions": session_counts.len(),
305 "session_breakdown": session_counts,
306 "domain_breakdown": domain_counts,
307 "scrape_cache_ttl_seconds": self.ttl,
308 "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
309 });
310
311 Ok(stats)
312 }
313
314 pub async fn is_url_cached(&self, session_id: &str, url: &str) -> Result<bool, BrightDataError> {
316 let key = self.generate_key(session_id, url);
317
318 let mut conn = self.client.get_multiplexed_async_connection().await
319 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
320
321 let exists: bool = conn.exists(&key).await
322 .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
323
324 Ok(exists)
325 }
326
327 pub async fn health_check(&self) -> Result<bool, BrightDataError> {
329 let mut conn = self.client.get_multiplexed_async_connection().await
330 .map_err(|_| BrightDataError::ToolError("Scraping cache Redis connection failed".into()))?;
331
332 let _: String = conn.ping().await
333 .map_err(|_| BrightDataError::ToolError("Scraping cache Redis ping failed".into()))?;
334
335 Ok(true)
336 }
337
338 pub async fn batch_cache_scrape_data(
340 &self,
341 session_id: &str,
342 scrape_data: Vec<(String, Value)>, ) -> Result<Vec<String>, BrightDataError> {
344 let mut successful_urls = Vec::new();
345
346 for (url, data) in scrape_data {
347 match self.cache_scrape_data(session_id, &url, data).await {
348 Ok(_) => {
349 successful_urls.push(url);
350 }
351 Err(e) => {
352 warn!("Failed to cache scraping data for {}: {}", url, e);
353 }
354 }
355 }
356
357 info!("📦 Batch cached {} URLs for session {}", successful_urls.len(), session_id);
358
359 Ok(successful_urls)
360 }
361
362 pub async fn get_cached_urls_by_domain(
364 &self,
365 session_id: &str,
366 domain: &str,
367 ) -> Result<Vec<String>, BrightDataError> {
368 let urls = self.get_session_scrape_urls(session_id).await?;
369
370 let matching_urls: Vec<String> = urls
371 .into_iter()
372 .filter(|url| {
373 if let Ok(parsed_url) = Url::parse(url) {
374 if let Some(url_domain) = parsed_url.domain() {
375 return url_domain.contains(domain) || domain.contains(url_domain);
376 }
377 }
378 false
379 })
380 .collect();
381
382 debug!("🔍 Found {} cached URLs matching domain '{}' for session {}",
383 matching_urls.len(), domain, session_id);
384
385 Ok(matching_urls)
386 }
387
388 pub async fn get_cache_summary(&self, session_id: &str) -> Result<Value, BrightDataError> {
390 let mut conn = self.client.get_multiplexed_async_connection().await
391 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
392
393 let pattern = format!("scrape:{}:*", session_id);
394 let keys: Vec<String> = conn.keys(&pattern).await
395 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
396
397 let mut summary = Vec::new();
398 let mut total_content_size = 0;
399
400 for key in keys {
401 if let Ok(Some(cached_json)) = conn.get::<_, Option<String>>(&key).await {
402 if let Ok(cached_data) = serde_json::from_str::<Value>(&cached_json) {
403 let url = cached_data.get("original_url")
404 .and_then(|u| u.as_str())
405 .unwrap_or("unknown");
406
407 let content_size = cached_data.get("content")
408 .and_then(|c| c.as_str())
409 .map(|s| s.len())
410 .unwrap_or(0);
411
412 let cached_at = cached_data.get("scrape_cached_at")
413 .and_then(|t| t.as_u64())
414 .unwrap_or(0);
415
416 total_content_size += content_size;
417
418 summary.push(json!({
419 "url": url,
420 "content_size_bytes": content_size,
421 "cached_at": cached_at,
422 "cache_key": key
423 }));
424 }
425 }
426 }
427
428 Ok(json!({
429 "session_id": session_id,
430 "total_cached_urls": summary.len(),
431 "total_content_size_bytes": total_content_size,
432 "cached_items": summary,
433 "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
434 }))
435 }
436}
437
438use std::sync::Arc;
440use tokio::sync::OnceCell;
441
442static SCRAPE_CACHE: OnceCell<Arc<ScrapeCache>> = OnceCell::const_new();
443
444pub async fn get_scrape_cache() -> Result<Arc<ScrapeCache>, BrightDataError> {
446 SCRAPE_CACHE.get_or_try_init(|| async {
447 let service = ScrapeCache::new().await?;
448 Ok(Arc::new(service))
449 }).await.map(|arc| arc.clone())
450}
451
452pub async fn init_scrape_cache() -> Result<(), BrightDataError> {
454 let _service = get_scrape_cache().await?;
455 info!("✅ Global scraping cache service initialized");
456 Ok(())
457}