snm_brightdata_client/services/cache/
crypto_cache.rs

1// crates/snm-brightdata-client/src/services/crypto_cache.rs
2use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8
9#[derive(Debug, Clone)]
10pub struct CryptoCache {
11    client: Client,
12    ttl: u64,
13}
14
15impl CryptoCache {
16    /// Initialize Redis crypto cache service
17    pub async fn new() -> Result<Self, BrightDataError> {
18        let redis_url = env::var("REDIS_URL")
19            .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
20        
21        let client = Client::open(redis_url.as_str())
22            .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
23        
24        // Test connection with the correct async method
25        let mut conn = client.get_multiplexed_async_connection().await
26            .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
27        
28        // Test ping
29        let _: String = conn.ping().await
30            .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
31        
32        let ttl = env::var("CRYPTO_CACHE_TTL_SECONDS")
33            .or_else(|_| env::var("CACHE_TTL_SECONDS"))
34            .ok()
35            .and_then(|s| s.parse::<u64>().ok())
36            .unwrap_or(3600); // Default 1 hour
37        
38        info!("Crypto cache service initialized (TTL: {}s)", ttl);
39        
40        Ok(Self { client, ttl })
41    }
42    
43    /// Generate crypto cache key: session_id:SYMBOL
44    fn generate_key(&self, session_id: &str, symbol: &str) -> String {
45        let key = format!("crypto:{}:{}", session_id, symbol.to_lowercase());
46        info!("Cache Key: {}", key);
47        key
48    }
49
50    /// Check if cached crypto data exists and is valid
51    pub async fn get_cached_crypto_data(
52        &self, 
53        session_id: &str,
54        symbol: &str
55    ) -> Result<Option<Value>, BrightDataError> {
56        let key = self.generate_key(session_id, symbol);
57        
58        let mut conn = self.client.get_multiplexed_async_connection().await
59            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
60        
61        match conn.get::<_, Option<String>>(&key).await {
62            Ok(Some(cached_json)) => {
63                match serde_json::from_str::<Value>(&cached_json) {
64                    Ok(cached_data) => {
65                        if let Some(timestamp) = cached_data.get("crypto_cached_at").and_then(|t| t.as_u64()) {
66                            let current_time = SystemTime::now()
67                                .duration_since(UNIX_EPOCH)
68                                .unwrap()
69                                .as_secs();
70                            
71                            if current_time - timestamp < self.ttl {
72                                info!("Crypto Cache HIT for {} in session {}", symbol, session_id);
73                                return Ok(Some(cached_data));
74                            } else {
75                                info!("Crypto Cache EXPIRED for {} in session {}", symbol, session_id);
76                                // Remove expired data
77                                let _: RedisResult<()> = conn.del(&key).await;
78                            }
79                        } else {
80                            // DEBUG: Show what fields are available
81                            let available_fields: Vec<String> = cached_data.as_object()
82                                .map(|obj| obj.keys().map(|k| k.to_string()).collect())
83                                .unwrap_or_default();
84                            warn!("Missing 'crypto_cached_at' field for key: {} (available: {:?})", key, available_fields);
85                            // Remove corrupted data
86                            let _: RedisResult<()> = conn.del(&key).await;
87                        }
88                    }
89                    Err(e) => {
90                        warn!("Failed to parse cached crypto data for {}: {}", key, e);
91                        // Remove corrupted data
92                        let _: RedisResult<()> = conn.del(&key).await;
93                    }
94                }
95            }
96            Ok(None) => {
97                debug!("Crypto Cache MISS for {} in session {} (key not found)", symbol, session_id);
98            }
99            Err(e) => {
100                error!("Redis get error for crypto {}: {}", key, e);
101                return Err(BrightDataError::ToolError(format!("Crypto cache get failed: {}", e)));
102            }
103        }
104        
105        Ok(None)
106    }
107    
108    /// Cache crypto data with metadata
109    pub async fn cache_crypto_data(
110        &self,
111        session_id: &str,
112        symbol: &str,
113        data: Value,
114    ) -> Result<(), BrightDataError> {
115        let key = self.generate_key(session_id, symbol);
116        
117        // Add crypto caching metadata
118        let mut cached_data = data;
119        cached_data["crypto_cached_at"] = json!(SystemTime::now()
120            .duration_since(UNIX_EPOCH)
121            .unwrap()
122            .as_secs());
123        cached_data["crypto_cache_key"] = json!(key.clone());
124        cached_data["crypto_cache_ttl"] = json!(self.ttl);
125        cached_data["symbol"] = json!(symbol.to_lowercase());
126        cached_data["session_id"] = json!(session_id);
127        
128        let json_string = serde_json::to_string(&cached_data)
129            .map_err(|e| BrightDataError::ToolError(format!("Crypto JSON serialization failed: {}", e)))?;
130        
131        let mut conn = self.client.get_multiplexed_async_connection().await
132            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
133        
134        // Set with TTL
135        conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
136            .map_err(|e| BrightDataError::ToolError(format!("Crypto cache set failed: {}", e)))?;
137        
138        info!("Cached crypto data for {} in session {} (TTL: {}s)", symbol, session_id, self.ttl);
139        
140        Ok(())
141    }
142    
143    /// Get all cached crypto symbols for a session
144    pub async fn get_session_crypto_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
145        let mut conn = self.client.get_multiplexed_async_connection().await
146            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
147        
148        // Get all keys for this session's crypto
149        let pattern = format!("crypto:{}:*", session_id);
150        let keys: Vec<String> = conn.keys(&pattern).await
151            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
152        
153        // Extract symbols from keys (crypto:session:SYMBOL -> SYMBOL)
154        let symbols: Vec<String> = keys
155            .iter()
156            .filter_map(|key| {
157                let parts: Vec<&str> = key.split(':').collect();
158                if parts.len() >= 3 {
159                    Some(parts[2].to_string())
160                } else {
161                    None
162                }
163            })
164            .collect();
165        
166        debug!("Found {} cached crypto symbols for session {}: {:?}", symbols.len(), session_id, symbols);
167        
168        Ok(symbols)
169    }
170    
171    /// Clear cache for specific crypto symbol in session
172    pub async fn clear_crypto_symbol_cache(
173        &self,
174        session_id: &str,
175        symbol: &str,
176    ) -> Result<(), BrightDataError> {
177        let key = self.generate_key(session_id, symbol);
178        
179        let mut conn = self.client.get_multiplexed_async_connection().await
180            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
181        
182        let _: RedisResult<()> = conn.del(&key).await;
183        
184        info!("Cleared crypto cache for {} in session {}", symbol, session_id);
185        
186        Ok(())
187    }
188    
189    /// Clear all crypto cache for entire session
190    pub async fn clear_session_crypto_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
191        let mut conn = self.client.get_multiplexed_async_connection().await
192            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
193        
194        // Get all crypto keys for this session
195        let pattern = format!("crypto:{}:*", session_id);
196        let keys: Vec<String> = conn.keys(&pattern).await
197            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
198        
199        if keys.is_empty() {
200            return Ok(0);
201        }
202        
203        let deleted_count: u32 = conn.del(&keys).await
204            .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
205        
206        info!("Cleared {} cached crypto items for session {}", deleted_count, session_id);
207        
208        Ok(deleted_count)
209    }
210    
211    /// Get crypto cache statistics
212    pub async fn get_crypto_cache_stats(&self) -> Result<Value, BrightDataError> {
213        let mut conn = self.client.get_multiplexed_async_connection().await
214            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
215        
216        // Get crypto cache key count
217        let crypto_keys: Vec<String> = conn.keys("crypto:*").await.unwrap_or_default();
218        
219        // Group by session
220        let mut session_counts = std::collections::HashMap::new();
221        for key in &crypto_keys {
222            let parts: Vec<&str> = key.split(':').collect();
223            if parts.len() >= 2 {
224                let session_id = parts[1];
225                *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
226            }
227        }
228        
229        let stats = json!({
230            "total_crypto_cache_entries": crypto_keys.len(),
231            "active_sessions": session_counts.len(),
232            "session_breakdown": session_counts,
233            "crypto_cache_ttl_seconds": self.ttl,
234            "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
235        });
236        
237        Ok(stats)
238    }
239    
240    /// Check if specific crypto is cached for session
241    pub async fn is_crypto_cached(&self, session_id: &str, symbol: &str) -> Result<bool, BrightDataError> {
242        let key = self.generate_key(session_id, symbol);
243        
244        let mut conn = self.client.get_multiplexed_async_connection().await
245            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
246        
247        let exists: bool = conn.exists(&key).await
248            .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
249        
250        Ok(exists)
251    }
252    
253    /// Health check for Redis connection
254    pub async fn health_check(&self) -> Result<bool, BrightDataError> {
255        let mut conn = self.client.get_multiplexed_async_connection().await
256            .map_err(|_| BrightDataError::ToolError("Crypto cache Redis connection failed".into()))?;
257        
258        let _: String = conn.ping().await
259            .map_err(|_| BrightDataError::ToolError("Crypto cache Redis ping failed".into()))?;
260        
261        Ok(true)
262    }
263    
264    /// Batch cache multiple crypto symbols (useful for comparisons)
265    pub async fn batch_cache_crypto_data(
266        &self,
267        session_id: &str,
268        crypto_data: Vec<(String, Value)>, // Vec<(symbol, data)>
269    ) -> Result<Vec<String>, BrightDataError> {
270        let mut successful_symbols = Vec::new();
271        
272        for (symbol, data) in crypto_data {
273            match self.cache_crypto_data(session_id, &symbol, data).await {
274                Ok(_) => {
275                    successful_symbols.push(symbol);
276                }
277                Err(e) => {
278                    warn!("Failed to cache crypto data for {}: {}", symbol, e);
279                }
280            }
281        }
282        
283        info!("Batch cached {} crypto symbols for session {}", successful_symbols.len(), session_id);
284        
285        Ok(successful_symbols)
286    }
287}
288
289// Singleton instance for global access
290use std::sync::Arc;
291use tokio::sync::OnceCell;
292
293static CRYPTO_CACHE: OnceCell<Arc<CryptoCache>> = OnceCell::const_new();
294
295/// Get global crypto cache service instance
296pub async fn get_crypto_cache() -> Result<Arc<CryptoCache>, BrightDataError> {
297    CRYPTO_CACHE.get_or_try_init(|| async {
298        let service = CryptoCache::new().await?;
299        Ok(Arc::new(service))
300    }).await.map(|arc| arc.clone())
301}
302
303/// Initialize crypto cache service (call this at startup)
304pub async fn init_crypto_cache() -> Result<(), BrightDataError> {
305    let _service = get_crypto_cache().await?;
306    info!("Global crypto cache service initialized");
307    Ok(())
308}