snm_brightdata_client/services/cache/
stock_cache.rs1use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8
9#[derive(Debug, Clone)]
10pub struct StockCache {
11 client: Client,
12 ttl: u64,
13}
14
15impl StockCache {
16 pub async fn new() -> Result<Self, BrightDataError> {
18 let redis_url = env::var("REDIS_URL")
19 .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
20
21 let client = Client::open(redis_url.as_str())
22 .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
23
24 let mut conn = client.get_multiplexed_async_connection().await
26 .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
27
28 let _: String = conn.ping().await
30 .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
31
32 let ttl = env::var("STOCK_CACHE_TTL_SECONDS")
33 .or_else(|_| env::var("CACHE_TTL_SECONDS"))
34 .ok()
35 .and_then(|s| s.parse::<u64>().ok())
36 .unwrap_or(3600); info!("✅ Stock cache service initialized (TTL: {}s)", ttl);
39
40 Ok(Self { client, ttl })
41 }
42
43 fn generate_key(&self, session_id: &str, symbol: &str) -> String {
45 let key = format!("stock:{}:{}", session_id, symbol.to_lowercase());
46 info!("Cache Key: {}", key);
47 key
48 }
49
50 pub async fn get_cached_stock_data(
54 &self,
55 session_id: &str,
56 symbol: &str
57 ) -> Result<Option<Value>, BrightDataError> {
58 let key = self.generate_key(session_id, symbol);
59
60 let mut conn = self.client.get_multiplexed_async_connection().await
61 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
62
63 match conn.get::<_, Option<String>>(&key).await {
64 Ok(Some(cached_json)) => {
65 match serde_json::from_str::<Value>(&cached_json) {
66 Ok(cached_data) => {
67 if let Some(timestamp) = cached_data.get("stock_cached_at").and_then(|t| t.as_u64()) {
69 let current_time = SystemTime::now()
70 .duration_since(UNIX_EPOCH)
71 .unwrap()
72 .as_secs();
73
74 if current_time - timestamp < self.ttl {
75 info!("🎯 Stock Cache HIT for {} in session {}", symbol, session_id);
76 return Ok(Some(cached_data));
77 } else {
78 info!("⏰ Stock Cache EXPIRED for {} in session {}", symbol, session_id);
79 let _: RedisResult<()> = conn.del(&key).await;
81 }
82 } else {
83 let available_fields: Vec<String> = cached_data.as_object()
85 .map(|obj| obj.keys().map(|k| k.to_string()).collect())
86 .unwrap_or_default();
87 warn!("❌ Missing 'stock_cached_at' field for key: {} (available: {:?})", key, available_fields);
88 let _: RedisResult<()> = conn.del(&key).await;
90 }
91 }
92 Err(e) => {
93 warn!("Failed to parse cached stock data for {}: {}", key, e);
94 let _: RedisResult<()> = conn.del(&key).await;
96 }
97 }
98 }
99 Ok(None) => {
100 debug!("💾 Stock Cache MISS for {} in session {} (key not found)", symbol, session_id);
101 }
102 Err(e) => {
103 error!("Redis get error for stock {}: {}", key, e);
104 return Err(BrightDataError::ToolError(format!("Stock cache get failed: {}", e)));
105 }
106 }
107
108 Ok(None)
109 }
110
111 pub async fn cache_stock_data(
113 &self,
114 session_id: &str,
115 symbol: &str,
116 data: Value,
117 ) -> Result<(), BrightDataError> {
118 let key = self.generate_key(session_id, symbol);
119
120 let mut cached_data = data;
122 cached_data["stock_cached_at"] = json!(SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .unwrap()
125 .as_secs());
126 cached_data["stock_cache_key"] = json!(key.clone());
127 cached_data["stock_cache_ttl"] = json!(self.ttl);
128 cached_data["symbol"] = json!(symbol.to_lowercase());
129 cached_data["session_id"] = json!(session_id);
130
131 let json_string = serde_json::to_string(&cached_data)
132 .map_err(|e| BrightDataError::ToolError(format!("Stock JSON serialization failed: {}", e)))?;
133
134 let mut conn = self.client.get_multiplexed_async_connection().await
135 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
136
137 conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
139 .map_err(|e| BrightDataError::ToolError(format!("Stock cache set failed: {}", e)))?;
140
141 info!("💾 Cached stock data for {} in session {} (TTL: {}s)", symbol, session_id, self.ttl);
142
143 Ok(())
144 }
145
146 pub async fn get_session_stock_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
148 let mut conn = self.client.get_multiplexed_async_connection().await
149 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
150
151 let pattern = format!("stock:{}:*", session_id);
153 let keys: Vec<String> = conn.keys(&pattern).await
154 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
155
156 let symbols: Vec<String> = keys
158 .iter()
159 .filter_map(|key| {
160 let parts: Vec<&str> = key.split(':').collect();
161 if parts.len() >= 3 {
162 Some(parts[2].to_string())
163 } else {
164 None
165 }
166 })
167 .collect();
168
169 debug!("📋 Found {} cached stock symbols for session {}: {:?}", symbols.len(), session_id, symbols);
170
171 Ok(symbols)
172 }
173
174 pub async fn clear_stock_symbol_cache(
176 &self,
177 session_id: &str,
178 symbol: &str,
179 ) -> Result<(), BrightDataError> {
180 let key = self.generate_key(session_id, symbol);
181
182 let mut conn = self.client.get_multiplexed_async_connection().await
183 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
184
185 let _: RedisResult<()> = conn.del(&key).await;
186
187 info!("🗑️ Cleared stock cache for {} in session {}", symbol, session_id);
188
189 Ok(())
190 }
191
192 pub async fn clear_session_stock_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
194 let mut conn = self.client.get_multiplexed_async_connection().await
195 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
196
197 let pattern = format!("stock:{}:*", session_id);
199 let keys: Vec<String> = conn.keys(&pattern).await
200 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
201
202 if keys.is_empty() {
203 return Ok(0);
204 }
205
206 let deleted_count: u32 = conn.del(&keys).await
207 .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
208
209 info!("🗑️ Cleared {} cached stock items for session {}", deleted_count, session_id);
210
211 Ok(deleted_count)
212 }
213
214 pub async fn get_stock_cache_stats(&self) -> Result<Value, BrightDataError> {
216 let mut conn = self.client.get_multiplexed_async_connection().await
217 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
218
219 let stock_keys: Vec<String> = conn.keys("stock:*").await.unwrap_or_default();
221
222 let mut session_counts = std::collections::HashMap::new();
224 for key in &stock_keys {
225 let parts: Vec<&str> = key.split(':').collect();
226 if parts.len() >= 2 {
227 let session_id = parts[1];
228 *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
229 }
230 }
231
232 let stats = json!({
233 "total_stock_cache_entries": stock_keys.len(),
234 "active_sessions": session_counts.len(),
235 "session_breakdown": session_counts,
236 "stock_cache_ttl_seconds": self.ttl,
237 "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
238 });
239
240 Ok(stats)
241 }
242
243 pub async fn is_stock_cached(&self, session_id: &str, symbol: &str) -> Result<bool, BrightDataError> {
245 let key = self.generate_key(session_id, symbol);
246
247 let mut conn = self.client.get_multiplexed_async_connection().await
248 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
249
250 let exists: bool = conn.exists(&key).await
251 .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
252
253 Ok(exists)
254 }
255
256 pub async fn health_check(&self) -> Result<bool, BrightDataError> {
258 let mut conn = self.client.get_multiplexed_async_connection().await
259 .map_err(|_| BrightDataError::ToolError("Stock cache Redis connection failed".into()))?;
260
261 let _: String = conn.ping().await
262 .map_err(|_| BrightDataError::ToolError("Stock cache Redis ping failed".into()))?;
263
264 Ok(true)
265 }
266
267 pub async fn batch_cache_stock_data(
269 &self,
270 session_id: &str,
271 stock_data: Vec<(String, Value)>, ) -> Result<Vec<String>, BrightDataError> {
273 let mut successful_symbols = Vec::new();
274
275 for (symbol, data) in stock_data {
276 match self.cache_stock_data(session_id, &symbol, data).await {
277 Ok(_) => {
278 successful_symbols.push(symbol);
279 }
280 Err(e) => {
281 warn!("Failed to cache stock data for {}: {}", symbol, e);
282 }
283 }
284 }
285
286 info!("📦 Batch cached {} stock symbols for session {}", successful_symbols.len(), session_id);
287
288 Ok(successful_symbols)
289 }
290}
291
292use std::sync::Arc;
294use tokio::sync::OnceCell;
295
296static STOCK_CACHE: OnceCell<Arc<StockCache>> = OnceCell::const_new();
297
298pub async fn get_stock_cache() -> Result<Arc<StockCache>, BrightDataError> {
300 STOCK_CACHE.get_or_try_init(|| async {
301 let service = StockCache::new().await?;
302 Ok(Arc::new(service))
303 }).await.map(|arc| arc.clone())
304}
305
306pub async fn init_stock_cache() -> Result<(), BrightDataError> {
308 let _service = get_stock_cache().await?;
309 info!("✅ Global stock cache service initialized");
310 Ok(())
311}