snm_brightdata_client/services/cache/
mutual_fund_cache.rs1use redis::{AsyncCommands, Client, RedisResult};
3use serde_json::{json, Value};
4use std::env;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use log::{info, warn, error, debug};
7use crate::error::BrightDataError;
8
9#[derive(Debug, Clone)]
10pub struct MutualFundCache {
11 client: Client,
12 ttl: u64,
13}
14
15impl MutualFundCache {
16 pub async fn new() -> Result<Self, BrightDataError> {
18 let redis_url = env::var("REDIS_URL")
19 .unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
20
21 let client = Client::open(redis_url.as_str())
22 .map_err(|e| BrightDataError::ToolError(format!("Failed to create Redis client: {}", e)))?;
23
24 let mut conn = client.get_multiplexed_async_connection().await
26 .map_err(|e| BrightDataError::ToolError(format!("Failed to connect to Redis: {}", e)))?;
27
28 let _: String = conn.ping().await
30 .map_err(|e| BrightDataError::ToolError(format!("Redis ping failed: {}", e)))?;
31
32 let ttl = env::var("MUTUAL_FUND_CACHE_TTL_SECONDS")
33 .or_else(|_| env::var("CACHE_TTL_SECONDS"))
34 .ok()
35 .and_then(|s| s.parse::<u64>().ok())
36 .unwrap_or(3600); info!("ETF cache service initialized (TTL: {}s)", ttl);
39
40 Ok(Self { client, ttl })
41 }
42
43 fn generate_key(&self, session_id: &str, symbol: &str) -> String {
45 let key = format!("mutual_fund:{}:{}", session_id, symbol.to_uppercase());
46 info!("Cache Key: {}", key);
47 key
48 }
49
50 pub async fn get_cached_mutual_fund_data(
52 &self,
53 session_id: &str,
54 symbol: &str
55 ) -> Result<Option<Value>, BrightDataError> {
56 let key = self.generate_key(session_id, symbol);
57
58 let mut conn = self.client.get_multiplexed_async_connection().await
59 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
60
61 match conn.get::<_, Option<String>>(&key).await {
62 Ok(Some(cached_json)) => {
63 match serde_json::from_str::<Value>(&cached_json) {
64 Ok(cached_data) => {
65 if let Some(timestamp) = cached_data.get("mutual_fund_cached_at").and_then(|t| t.as_u64()) {
66 let current_time = SystemTime::now()
67 .duration_since(UNIX_EPOCH)
68 .unwrap()
69 .as_secs();
70
71 if current_time - timestamp < self.ttl {
72 info!("ETF Cache HIT for {} in session {}", symbol, session_id);
73 return Ok(Some(cached_data));
74 } else {
75 info!("ETF Cache EXPIRED for {} in session {}", symbol, session_id);
76 let _: RedisResult<()> = conn.del(&key).await;
78 }
79 } else {
80 let available_fields: Vec<String> = cached_data.as_object()
82 .map(|obj| obj.keys().map(|k| k.to_string()).collect())
83 .unwrap_or_default();
84 warn!("Missing 'mutual_fund_cached_at' field for key: {} (available: {:?})", key, available_fields);
85 let _: RedisResult<()> = conn.del(&key).await;
87 }
88 }
89 Err(e) => {
90 warn!("Failed to parse cached ETF data for {}: {}", key, e);
91 let _: RedisResult<()> = conn.del(&key).await;
93 }
94 }
95 }
96 Ok(None) => {
97 debug!("ETF Cache MISS for {} in session {} (key not found)", symbol, session_id);
98 }
99 Err(e) => {
100 error!("Redis get error for ETF {}: {}", key, e);
101 return Err(BrightDataError::ToolError(format!("ETF cache get failed: {}", e)));
102 }
103 }
104
105 Ok(None)
106 }
107
108 pub async fn cache_mutual_fund_data(
110 &self,
111 session_id: &str,
112 symbol: &str,
113 data: Value,
114 ) -> Result<(), BrightDataError> {
115 let key = self.generate_key(session_id, symbol);
116
117 let mut cached_data = data;
119 cached_data["mutual_fund_cached_at"] = json!(SystemTime::now()
120 .duration_since(UNIX_EPOCH)
121 .unwrap()
122 .as_secs());
123 cached_data["mutual_fund_cache_key"] = json!(key.clone());
124 cached_data["mutual_fund_cache_ttl"] = json!(self.ttl);
125 cached_data["symbol"] = json!(symbol.to_uppercase());
126 cached_data["session_id"] = json!(session_id);
127
128 let json_string = serde_json::to_string(&cached_data)
129 .map_err(|e| BrightDataError::ToolError(format!("ETF JSON serialization failed: {}", e)))?;
130
131 let mut conn = self.client.get_multiplexed_async_connection().await
132 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
133
134 conn.set_ex::<_, _, ()>(&key, &json_string, self.ttl as u64).await
136 .map_err(|e| BrightDataError::ToolError(format!("ETF cache set failed: {}", e)))?;
137
138 info!("Cached ETF data for {} in session {} (TTL: {}s)", symbol, session_id, self.ttl);
139
140 Ok(())
141 }
142
143 pub async fn get_session_mutual_fund_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
145 let mut conn = self.client.get_multiplexed_async_connection().await
146 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
147
148 let pattern = format!("mutual_fund:{}:*", session_id);
150 let keys: Vec<String> = conn.keys(&pattern).await
151 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
152
153 let symbols: Vec<String> = keys
155 .iter()
156 .filter_map(|key| {
157 let parts: Vec<&str> = key.split(':').collect();
158 if parts.len() >= 3 {
159 Some(parts[2].to_string())
160 } else {
161 None
162 }
163 })
164 .collect();
165
166 debug!("Found {} cached ETF symbols for session {}: {:?}", symbols.len(), session_id, symbols);
167
168 Ok(symbols)
169 }
170
171 pub async fn clear_mutual_fund_symbol_cache(
173 &self,
174 session_id: &str,
175 symbol: &str,
176 ) -> Result<(), BrightDataError> {
177 let key = self.generate_key(session_id, symbol);
178
179 let mut conn = self.client.get_multiplexed_async_connection().await
180 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
181
182 let _: RedisResult<()> = conn.del(&key).await;
183
184 info!("Cleared ETF cache for {} in session {}", symbol, session_id);
185
186 Ok(())
187 }
188
189 pub async fn clear_session_mutual_fund_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
191 let mut conn = self.client.get_multiplexed_async_connection().await
192 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
193
194 let pattern = format!("mutual_fund:{}:*", session_id);
196 let keys: Vec<String> = conn.keys(&pattern).await
197 .map_err(|e| BrightDataError::ToolError(format!("Redis keys command failed: {}", e)))?;
198
199 if keys.is_empty() {
200 return Ok(0);
201 }
202
203 let deleted_count: u32 = conn.del(&keys).await
204 .map_err(|e| BrightDataError::ToolError(format!("Redis delete failed: {}", e)))?;
205
206 info!("Cleared {} cached ETF items for session {}", deleted_count, session_id);
207
208 Ok(deleted_count)
209 }
210
211 pub async fn get_mutual_fund_cache_stats(&self) -> Result<Value, BrightDataError> {
213 let mut conn = self.client.get_multiplexed_async_connection().await
214 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
215
216 let mutual_fund_keys: Vec<String> = conn.keys("mutual_fund:*").await.unwrap_or_default();
218
219 let mut session_counts = std::collections::HashMap::new();
221 for key in &mutual_fund_keys {
222 let parts: Vec<&str> = key.split(':').collect();
223 if parts.len() >= 2 {
224 let session_id = parts[1];
225 *session_counts.entry(session_id.to_string()).or_insert(0) += 1;
226 }
227 }
228
229 let stats = json!({
230 "total_mutual_fund_cache_entries": mutual_fund_keys.len(),
231 "active_sessions": session_counts.len(),
232 "session_breakdown": session_counts,
233 "mutual_fund_cache_ttl_seconds": self.ttl,
234 "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
235 });
236
237 Ok(stats)
238 }
239
240 pub async fn is_mutual_fund_cached(&self, session_id: &str, symbol: &str) -> Result<bool, BrightDataError> {
242 let key = self.generate_key(session_id, symbol);
243
244 let mut conn = self.client.get_multiplexed_async_connection().await
245 .map_err(|e| BrightDataError::ToolError(format!("Redis connection failed: {}", e)))?;
246
247 let exists: bool = conn.exists(&key).await
248 .map_err(|e| BrightDataError::ToolError(format!("Redis exists check failed: {}", e)))?;
249
250 Ok(exists)
251 }
252
253 pub async fn health_check(&self) -> Result<bool, BrightDataError> {
255 let mut conn = self.client.get_multiplexed_async_connection().await
256 .map_err(|_| BrightDataError::ToolError("ETF cache Redis connection failed".into()))?;
257
258 let _: String = conn.ping().await
259 .map_err(|_| BrightDataError::ToolError("ETF cache Redis ping failed".into()))?;
260
261 Ok(true)
262 }
263
264 pub async fn batch_cache_mutual_fund_data(
266 &self,
267 session_id: &str,
268 mutual_fund_data: Vec<(String, Value)>, ) -> Result<Vec<String>, BrightDataError> {
270 let mut successful_symbols = Vec::new();
271
272 for (symbol, data) in mutual_fund_data {
273 match self.cache_mutual_fund_data(session_id, &symbol, data).await {
274 Ok(_) => {
275 successful_symbols.push(symbol);
276 }
277 Err(e) => {
278 warn!("Failed to cache ETF data for {}: {}", symbol, e);
279 }
280 }
281 }
282
283 info!("Batch cached {} ETF symbols for session {}", successful_symbols.len(), session_id);
284
285 Ok(successful_symbols)
286 }
287}
288
289use std::sync::Arc;
291use tokio::sync::OnceCell;
292
293static MUTUAL_FUND_CACHE: OnceCell<Arc<MutualFundCache>> = OnceCell::const_new();
294
295pub async fn get_mutual_fund_cache() -> Result<Arc<MutualFundCache>, BrightDataError> {
297 MUTUAL_FUND_CACHE.get_or_try_init(|| async {
298 let service = MutualFundCache::new().await?;
299 Ok(Arc::new(service))
300 }).await.map(|arc| arc.clone())
301}
302
303pub async fn init_mutual_fund_cache() -> Result<(), BrightDataError> {
305 let _service = get_mutual_fund_cache().await?;
306 info!("Global ETF cache service initialized");
307 Ok(())
308}