snm_brightdata_client/services/cache/
stock_cache.rs

1// crates/snm-brightdata-client/src/services/stock_cache.rs
2use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8
9#[derive(Debug, Clone)]
10pub struct StockCache {
11    client: Client,
12    ttl: u64,
13}
14
15impl StockCache {
16    /// Initialize Redis stock cache service
17    pub async fn new() -> Result<Self, BrightDataError> {
18        let redis_url = env::var("REDIS_URL")
19            .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
20        
21        let client = Client::open(redis_url.as_str())
22            .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
23        
24        // Test connection with the correct async method
25        let mut conn = client.get_multiplexed_async_connection().await
26            .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
27        
28        // Test ping
29        let _: String = conn.ping().await
30            .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
31        
32        let ttl = env::var("STOCK_CACHE_TTL_SECONDS")
33            .or_else(|_| env::var("CACHE_TTL_SECONDS"))
34            .ok()
35            .and_then(|s| s.parse::<u64>().ok())
36            .unwrap_or(3600); // Default 1 hour
37        
38        info!("✅ Stock cache service initialized (TTL: {}s)", ttl);
39        
40        Ok(Self { client, ttl })
41    }
42    
43    /// Generate stock cache key: session_id:SYMBOL
44    fn generate_key(&self, session_id: &str, symbol: &str) -> String {
45        let key = format!("stock:{}:{}", session_id, symbol.to_lowercase());
46        info!("Cache Key: {}", key);
47        key
48    }
49    
50    // FINAL FIX: Change line 61 in get_cached_stock_data()
51
52    /// Check if cached stock data exists and is valid
53    pub async fn get_cached_stock_data(
54        &self, 
55        session_id: &str,
56        symbol: &str
57    ) -> Result<Option<Value>, BrightDataError> {
58        let key = self.generate_key(session_id, symbol);
59        
60        let mut conn = self.client.get_multiplexed_async_connection().await
61            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
62        
63        match conn.get::<_, Option<String>>(&key).await {
64            Ok(Some(cached_json)) => {
65                match serde_json::from_str::<Value>(&cached_json) {
66                    Ok(cached_data) => {
67                        // 🔧 CRITICAL FIX: Change "cached_at" to "stock_cached_at"
68                        if let Some(timestamp) = cached_data.get("stock_cached_at").and_then(|t| t.as_u64()) {
69                            let current_time = SystemTime::now()
70                                .duration_since(UNIX_EPOCH)
71                                .unwrap()
72                                .as_secs();
73                            
74                            if current_time - timestamp < self.ttl {
75                                info!("🎯 Stock Cache HIT for {} in session {}", symbol, session_id);
76                                return Ok(Some(cached_data));
77                            } else {
78                                info!("⏰ Stock Cache EXPIRED for {} in session {}", symbol, session_id);
79                                // Remove expired data
80                                let _: RedisResult<()> = conn.del(&key).await;
81                            }
82                        } else {
83                            // 🔍 DEBUG: Show what fields are available
84                            let available_fields: Vec<String> = cached_data.as_object()
85                                .map(|obj| obj.keys().map(|k| k.to_string()).collect())
86                                .unwrap_or_default();
87                            warn!("❌ Missing 'stock_cached_at' field for key: {} (available: {:?})", key, available_fields);
88                            // Remove corrupted data
89                            let _: RedisResult<()> = conn.del(&key).await;
90                        }
91                    }
92                    Err(e) => {
93                        warn!("Failed to parse cached stock data for {}: {}", key, e);
94                        // Remove corrupted data
95                        let _: RedisResult<()> = conn.del(&key).await;
96                    }
97                }
98            }
99            Ok(None) => {
100                debug!("💾 Stock Cache MISS for {} in session {} (key not found)", symbol, session_id);
101            }
102            Err(e) => {
103                error!("Redis get error for stock {}: {}", key, e);
104                return Err(BrightDataError::ToolError(format!("Stock cache get failed: {}", e)));
105            }
106        }
107        
108        Ok(None)
109    }
110    
111    /// Cache stock data with metadata
112    pub async fn cache_stock_data(
113        &self,
114        session_id: &str,
115        symbol: &str,
116        data: Value,
117    ) -> Result<(), BrightDataError> {
118        let key = self.generate_key(session_id, symbol);
119        
120        // Add stock caching metadata
121        let mut cached_data = data;
122        cached_data["stock_cached_at"] = json!(SystemTime::now()
123            .duration_since(UNIX_EPOCH)
124            .unwrap()
125            .as_secs());
126        cached_data["stock_cache_key"] = json!(key.clone());
127        cached_data["stock_cache_ttl"] = json!(self.ttl);
128        cached_data["symbol"] = json!(symbol.to_lowercase());
129        cached_data["session_id"] = json!(session_id);
130        
131        let json_string = serde_json::to_string(&cached_data)
132            .map_err(|e| BrightDataError::ToolError(format!("Stock JSON serialization failed: {}", e)))?;
133        
134        let mut conn = self.client.get_multiplexed_async_connection().await
135            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
136        
137        // Set with TTL
138        conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
139            .map_err(|e| BrightDataError::ToolError(format!("Stock cache set failed: {}", e)))?;
140        
141        info!("💾 Cached stock data for {} in session {} (TTL: {}s)", symbol, session_id, self.ttl);
142        
143        Ok(())
144    }
145    
146    /// Get all cached stock symbols for a session
147    pub async fn get_session_stock_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
148        let mut conn = self.client.get_multiplexed_async_connection().await
149            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
150        
151        // Get all keys for this session's stocks
152        let pattern = format!("stock:{}:*", session_id);
153        let keys: Vec<String> = conn.keys(&pattern).await
154            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
155        
156        // Extract symbols from keys (stock:session:SYMBOL -> SYMBOL)
157        let symbols: Vec<String> = keys
158            .iter()
159            .filter_map(|key| {
160                let parts: Vec<&str> = key.split(':').collect();
161                if parts.len() >= 3 {
162                    Some(parts[2].to_string())
163                } else {
164                    None
165                }
166            })
167            .collect();
168        
169        debug!("📋 Found {} cached stock symbols for session {}: {:?}", symbols.len(), session_id, symbols);
170        
171        Ok(symbols)
172    }
173    
174    /// Clear cache for specific stock symbol in session
175    pub async fn clear_stock_symbol_cache(
176        &self,
177        session_id: &str,
178        symbol: &str,
179    ) -> Result<(), BrightDataError> {
180        let key = self.generate_key(session_id, symbol);
181        
182        let mut conn = self.client.get_multiplexed_async_connection().await
183            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
184        
185        let _: RedisResult<()> = conn.del(&key).await;
186        
187        info!("🗑️ Cleared stock cache for {} in session {}", symbol, session_id);
188        
189        Ok(())
190    }
191    
192    /// Clear all stock cache for entire session
193    pub async fn clear_session_stock_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
194        let mut conn = self.client.get_multiplexed_async_connection().await
195            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
196        
197        // Get all stock keys for this session
198        let pattern = format!("stock:{}:*", session_id);
199        let keys: Vec<String> = conn.keys(&pattern).await
200            .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
201        
202        if keys.is_empty() {
203            return Ok(0);
204        }
205        
206        let deleted_count: u32 = conn.del(&keys).await
207            .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
208        
209        info!("🗑️ Cleared {} cached stock items for session {}", deleted_count, session_id);
210        
211        Ok(deleted_count)
212    }
213    
214    /// Get stock cache statistics
215    pub async fn get_stock_cache_stats(&self) -> Result<Value, BrightDataError> {
216        let mut conn = self.client.get_multiplexed_async_connection().await
217            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
218        
219        // Get stock cache key count
220        let stock_keys: Vec<String> = conn.keys("stock:*").await.unwrap_or_default();
221        
222        // Group by session
223        let mut session_counts = std::collections::HashMap::new();
224        for key in &stock_keys {
225            let parts: Vec<&str> = key.split(':').collect();
226            if parts.len() >= 2 {
227                let session_id = parts[1];
228                *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
229            }
230        }
231        
232        let stats = json!({
233            "total_stock_cache_entries": stock_keys.len(),
234            "active_sessions": session_counts.len(),
235            "session_breakdown": session_counts,
236            "stock_cache_ttl_seconds": self.ttl,
237            "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
238        });
239        
240        Ok(stats)
241    }
242    
243    /// Check if specific stock is cached for session
244    pub async fn is_stock_cached(&self, session_id: &str, symbol: &str) -> Result<bool, BrightDataError> {
245        let key = self.generate_key(session_id, symbol);
246        
247        let mut conn = self.client.get_multiplexed_async_connection().await
248            .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
249        
250        let exists: bool = conn.exists(&key).await
251            .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
252        
253        Ok(exists)
254    }
255    
256    /// Health check for Redis connection
257    pub async fn health_check(&self) -> Result<bool, BrightDataError> {
258        let mut conn = self.client.get_multiplexed_async_connection().await
259            .map_err(|_| BrightDataError::ToolError("Stock cache Redis connection failed".into()))?;
260        
261        let _: String = conn.ping().await
262            .map_err(|_| BrightDataError::ToolError("Stock cache Redis ping failed".into()))?;
263        
264        Ok(true)
265    }
266    
267    /// Batch cache multiple stock symbols (useful for comparisons)
268    pub async fn batch_cache_stock_data(
269        &self,
270        session_id: &str,
271        stock_data: Vec<(String, Value)>, // Vec<(symbol, data)>
272    ) -> Result<Vec<String>, BrightDataError> {
273        let mut successful_symbols = Vec::new();
274        
275        for (symbol, data) in stock_data {
276            match self.cache_stock_data(session_id, &symbol, data).await {
277                Ok(_) => {
278                    successful_symbols.push(symbol);
279                }
280                Err(e) => {
281                    warn!("Failed to cache stock data for {}: {}", symbol, e);
282                }
283            }
284        }
285        
286        info!("📦 Batch cached {} stock symbols for session {}", successful_symbols.len(), session_id);
287        
288        Ok(successful_symbols)
289    }
290}
291
292// Singleton instance for global access
293use std::sync::Arc;
294use tokio::sync::OnceCell;
295
296static STOCK_CACHE: OnceCell<Arc<StockCache>> = OnceCell::const_new();
297
298/// Get global stock cache service instance
299pub async fn get_stock_cache() -> Result<Arc<StockCache>, BrightDataError> {
300    STOCK_CACHE.get_or_try_init(|| async {
301        let service = StockCache::new().await?;
302        Ok(Arc::new(service))
303    }).await.map(|arc| arc.clone())
304}
305
306/// Initialize stock cache service (call this at startup)
307pub async fn init_stock_cache() -> Result<(), BrightDataError> {
308    let _service = get_stock_cache().await?;
309    info!("✅ Global stock cache service initialized");
310    Ok(())
311}