snm_brightdata_client/tools/
etf.rs

1// src/tools/etf.rs
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::etf_cache::get_etf_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::etf_symbol::match_symbol_from_query;
16
17// Struct to organize URLs by method
18#[derive(Debug, Clone)]
19pub struct MethodUrls {
20    pub proxy: Vec<(String, String)>,  // (url, description)
21    pub direct: Vec<(String, String)>, // (url, description)
22}
23
24pub struct ETFDataTool;
25
26#[async_trait]
27impl Tool for ETFDataTool {
28    fn name(&self) -> &str {
29        "get_etf_data"
30    }
31
32    fn description(&self) -> &str {
33        "Get comprehensive ETF snapshot (price, summary, metrics) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}.NS/ (e.g., NIFTYBEES)."
34    }
35
36    fn input_schema(&self) -> Value {
37        json!({
38            "type": "object",
39            "properties": {
40                "query": {
41                    "type": "string",
42                    "description": "ETF ticker or name (e.g., NIFTYBEES, JUNIORBEES). If provided, used when 'symbol' missing."
43                },
44                "symbol": {
45                    "type": "string",
46                    "description": "ETF ticker (e.g., NIFTYBEES, JUNIORBEES, GOLDBEES)"
47                },
48                "data_type": {
49                    "type": "string",
50                    "enum": ["price", "fundamentals", "technical", "news", "all"],
51                    "default": "all",
52                    "description": "Focus slice for parsing/formatting"
53                },
54                "timeframe": {
55                    "type": "string",
56                    "enum": ["realtime", "day", "week", "month", "quarter", "year"],
57                    "default": "realtime",
58                    "description": "Time period context for analysis"
59                },
60                "include_ratios": {
61                    "type": "boolean",
62                    "default": true,
63                    "description": "Include ratios/metrics if available"
64                },
65                "include_volume": {
66                    "type": "boolean",
67                    "default": true,
68                    "description": "Include volume/liquidity info if available"
69                },
70                "user_id": {
71                    "type": "string",
72                    "description": "Session/user id for cache scoping"
73                }
74            },
75            "required": ["symbol", "user_id"]
76        })
77    }
78
79    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
80        self.execute_internal(parameters).await
81    }
82
83    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
84        let raw_query = parameters
85            .get("symbol")
86            .and_then(|v| v.as_str())
87            .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
88
89        let session_id = parameters
90            .get("user_id")
91            .and_then(|v| v.as_str())
92            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
93
94        // Step 1: Resolve known symbols (or fallback)
95        let matched_symbol = match_symbol_from_query(raw_query);
96
97        // Step 2: Strip trailing .com / .xyz etc.
98        let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
99
100        let market = parameters
101            .get("market")
102            .and_then(|v| v.as_str())
103            .unwrap_or("usd");
104
105        let data_type = parameters
106            .get("data_type")
107            .and_then(|v| v.as_str())
108            .unwrap_or("all");
109
110        let timeframe = parameters
111            .get("timeframe")
112            .and_then(|v| v.as_str())
113            .unwrap_or("realtime");
114
115        let include_ratios = parameters
116            .get("include_ratios")
117            .and_then(|v| v.as_bool())
118            .unwrap_or(true);
119
120        let include_volume = parameters
121            .get("include_volume")
122            .and_then(|v| v.as_bool())
123            .unwrap_or(true);
124
125        let query_priority = ResponseStrategy::classify_query_priority(query);
126        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
127
128        let execution_id = format!("etf_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
129        
130        info!("📈 ETF query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})", 
131              query, market, query_priority, recommended_tokens, session_id);
132        
133        // 🎯 CACHE CHECK - Check Redis cache first
134        match self.check_cache_first(query, session_id).await {
135            Ok(Some(cached_result)) => {
136                info!("🚀 Cache HIT: Returning cached data for {} in session {}", query, session_id);
137                
138                // Create tool result from cached data
139                let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
140                let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
141                let method_used = "Redis Cache";
142                
143                let formatted_response = self.create_formatted_etf_response(
144                    query, market, content, source_used, method_used, 
145                    data_type, timeframe, include_ratios, include_volume, &execution_id
146                );
147                
148                let tool_result = ToolResult::success_with_raw(
149                    vec![McpContent::text(formatted_response)], 
150                    cached_result
151                );
152                
153                // Apply filtering only if DEDUCT_DATA=true
154                if self.is_data_reduction_enabled() {
155                    return Ok(ResponseStrategy::apply_size_limits(tool_result));
156                } else {
157                    return Ok(tool_result);
158                }
159            }
160            Ok(None) => {
161                info!("💾 Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
162            }
163            Err(e) => {
164                warn!("🚨 Cache error (continuing with fresh fetch): {}", e);
165            }
166        }
167
168        // 🌐 FRESH FETCH - Cache miss, fetch from sources
169        match self.fetch_etf_data_with_fallbacks_and_priority(
170            query, market, data_type, timeframe, include_ratios, include_volume,
171            query_priority, recommended_tokens, &execution_id
172        ).await {
173            Ok(result) => {
174                // 🗄️ CACHE STORE - Store successful result in cache
175                if let Err(e) = self.store_in_cache(query, session_id, &result).await {
176                    warn!("Failed to store result in cache: {}", e);
177                }
178                
179                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
180                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
181                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
182                
183                // Create formatted response based on DEDUCT_DATA setting
184                let formatted_response = self.create_formatted_etf_response(
185                    query, market, content, source_used, method_used, 
186                    data_type, timeframe, include_ratios, include_volume, &execution_id
187                );
188                
189                let tool_result = ToolResult::success_with_raw(
190                    vec![McpContent::text(formatted_response)], 
191                    result
192                );
193                
194                // Apply filtering only if DEDUCT_DATA=true
195                if self.is_data_reduction_enabled() {
196                    Ok(ResponseStrategy::apply_size_limits(tool_result))
197                } else {
198                    Ok(tool_result)
199                }
200            }
201            Err(_e) => {
202                // Return empty data for BrightData errors - Anthropic will retry
203                warn!("BrightData error for query '{}', returning empty data for retry", query);
204                let empty_response = json!({
205                    "query": query,
206                    "market": market,
207                    "status": "no_data",
208                    "reason": "brightdata_error",
209                    "execution_id": execution_id,
210                    "session_id": session_id
211                });
212                
213                Ok(ToolResult::success_with_raw(
214                    vec![McpContent::text("📈 **No Data Available**\n\nPlease try again with a more specific ETF symbol.".to_string())],
215                    empty_response
216                ))
217            }
218        }
219    }
220}
221
222impl ETFDataTool {
223    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
224    fn is_data_reduction_enabled(&self) -> bool {
225        std::env::var("DEDUCT_DATA")
226            .unwrap_or_else(|_| "false".to_string())
227            .to_lowercase() == "true"
228    }
229
230    /// ENHANCED: Create formatted response with DEDUCT_DATA control
231    fn create_formatted_etf_response(
232        &self,
233        query: &str,
234        market: &str, 
235        content: &str,
236        source: &str,
237        method: &str,
238        data_type: &str,
239        timeframe: &str,
240        include_ratios: bool,
241        include_volume: bool,
242        execution_id: &str
243    ) -> String {
244        // If DEDUCT_DATA=false, return full content with basic formatting
245        if !self.is_data_reduction_enabled() {
246            return format!(
247                "📈 **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
248                query.to_uppercase(), 
249                market.to_uppercase(), 
250                content,
251                source, 
252                method, 
253                data_type, 
254                timeframe
255            );
256        }
257
258        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
259        // For now, return full content formatted
260        format!(
261            "📈 **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
262            query.to_uppercase(), 
263            market.to_uppercase(), 
264            content,
265            source, 
266            method, 
267            data_type, 
268            timeframe
269        )
270    }
271
272    // 🎯 ADDED: Check Redis cache first
273    async fn check_cache_first(
274        &self,
275        query: &str,
276        session_id: &str,
277    ) -> Result<Option<Value>, BrightDataError> {
278        let cache_service = get_etf_cache().await?;
279        cache_service.get_cached_etf_data(session_id, query).await
280    }
281
282    // 🗄️ ADDED: Store successful result in Redis cache
283    async fn store_in_cache(
284        &self,
285        query: &str,
286        session_id: &str,
287        data: &Value,
288    ) -> Result<(), BrightDataError> {
289        let cache_service = get_etf_cache().await?;
290        cache_service.cache_etf_data(session_id, query, data.clone()).await
291    }
292
293    /// ENHANCED: Build URLs separated by method (proxy vs direct)
294    fn build_prioritized_urls_with_priority(
295        &self, 
296        query: &str, 
297        market: &str, 
298        data_type: &str,
299        priority: crate::filters::strategy::QueryPriority
300    ) -> MethodUrls {
301        let mut proxy_urls = Vec::new();
302        let mut direct_urls = Vec::new();
303        let clean_query = query.trim().to_uppercase();
304
305        // Add priority-based URL limiting
306        let max_sources = 3;
307
308        if self.is_likely_etf_symbol(&clean_query) {
309            match market {
310                "usd" => {
311                    let symbols_to_try = vec![
312                        format!("{}.NS", clean_query),
313                        clean_query.clone(),
314                    ];
315                    
316                    for (i, symbol) in symbols_to_try.iter().enumerate() {
317                        if i >= max_sources { break; }
318                        
319                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
320                        let description = format!("Yahoo Finance ({})", symbol);
321
322                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
323                        let proxy_description = format!("Yahoo Finance ({})", symbol);
324                        
325                        // Add to both proxy and direct (same URLs, different methods)
326                        proxy_urls.push((proxy_url, proxy_description));
327                        direct_urls.push((url, description));
328                    }
329                },
330                "inr" => {
331                    let symbols_to_try = vec![
332                        format!("{}.NS", clean_query),
333                        clean_query.clone(),
334                    ];
335                    
336                    for (i, symbol) in symbols_to_try.iter().enumerate() {
337                        if i >= max_sources { break; }
338                        
339                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
340                        let description = format!("Yahoo Finance ({})", symbol);
341
342                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
343                        let proxy_description = format!("Yahoo Finance ({})", symbol);
344                        
345                        // Add to both proxy and direct (same URLs, different methods)
346                        proxy_urls.push((proxy_url, proxy_description));
347                        direct_urls.push((url, description));
348                    }
349                }
350
351                _ => {
352                    let symbols_to_try = vec![
353                        format!("{}.NS", clean_query),
354                        clean_query.clone(),
355                    ];
356                    
357                    for (i, symbol) in symbols_to_try.iter().enumerate() {
358                        if i >= max_sources { break; }
359                        
360                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
361                        let description = format!("Yahoo Finance ({})", symbol);
362
363                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
364                        let proxy_description = format!("Yahoo Finance ({})", symbol);
365                        
366                        // Add to both proxy and direct (same URLs, different methods)
367                        proxy_urls.push((proxy_url, proxy_description));
368                        direct_urls.push((url, description));
369                    }
370                }
371
372            }
373        }
374
375        // Add search fallbacks (no restrictions when DEDUCT_DATA=false)
376        if proxy_urls.len() < max_sources {
377            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
378            let description = "Yahoo Finance Search".to_string();
379
380            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
381            let proxy_description = "Yahoo Finance Search".to_string();
382            
383            proxy_urls.push((proxy_url, proxy_description));
384            direct_urls.push((url, description));
385        }
386
387        info!("🎯 Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
388              proxy_urls.len(), direct_urls.len(), query, priority);
389        
390        MethodUrls {
391            proxy: proxy_urls,
392            direct: direct_urls,
393        }
394    }
395
396    /// ENHANCED: Main fetch function with method-separated URL structure
397    async fn fetch_etf_data_with_fallbacks_and_priority(
398        &self, 
399        query: &str, 
400        market: &str, 
401        data_type: &str,
402        timeframe: &str,
403        include_ratios: bool,
404        include_volume: bool,
405        query_priority: crate::filters::strategy::QueryPriority,
406        token_budget: usize,
407        execution_id: &str
408    ) -> Result<Value, BrightDataError> {
409        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
410        let mut last_error = None;
411        let mut attempts = Vec::new();
412
413        // Define method priority: try direct first, then proxy
414        let methods_to_try = vec![
415            // ("direct", "Direct Call", &method_urls.direct),
416            ("proxy", "Proxy Fallback", &method_urls.proxy)
417        ];
418
419        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
420            info!("🔄 Trying {} method with {} URLs", method_name, urls_for_method.len());
421            
422            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
423                let attempt_result = match *method_type {
424                    "direct" => {
425                        info!("🌐 Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
426                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
427                        self.try_fetch_url_direct_api(
428                            url, query, market, source_name, query_priority, token_budget, 
429                            execution_id, url_sequence as u64, method_sequence as u64
430                        ).await
431                    }
432                    "proxy" => {
433                        info!("🔄 Trying Proxy method for {} (method: {}, url: {}/{})", 
434                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
435                        self.try_fetch_url_via_proxy(
436                            url, query, market, source_name, query_priority, token_budget, 
437                            execution_id, url_sequence as u64, method_sequence as u64
438                        ).await
439                    }
440                    _ => continue,
441                };
442
443                match attempt_result {
444                    Ok(mut result) => {
445                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
446                        
447                        attempts.push(json!({
448                            "source": source_name,
449                            "url": url,
450                            "method": method_name,
451                            "status": "success",
452                            "content_length": content.len(),
453                            "method_sequence": method_sequence + 1,
454                            "url_sequence": url_sequence + 1
455                        }));
456                        
457                        // SUCCESS
458                        result["source_used"] = json!(source_name);
459                        result["url_used"] = json!(url);
460                        result["method_used"] = json!(method_name);
461                        result["execution_id"] = json!(execution_id);
462                        result["priority"] = json!(format!("{:?}", query_priority));
463                        result["token_budget"] = json!(token_budget);
464                        result["attempts"] = json!(attempts);
465                        result["successful_method_sequence"] = json!(method_sequence + 1);
466                        result["successful_url_sequence"] = json!(url_sequence + 1);
467                        
468                        info!("✅ Successfully fetched ETF data from {} via {} (method: {}, url: {})", 
469                              source_name, method_name, method_sequence + 1, url_sequence + 1);
470                        
471                        return Ok(result);
472                    }
473                    Err(e) => {
474                        attempts.push(json!({
475                            "source": source_name,
476                            "url": url,
477                            "method": method_name,
478                            "status": "failed",
479                            "error": e.to_string(),
480                            "method_sequence": method_sequence + 1,
481                            "url_sequence": url_sequence + 1
482                        }));
483                        
484                        last_error = Some(e);
485                        warn!("❌ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
486                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
487                    }
488                }
489            }
490        }
491
492        // All methods and sources failed - return empty data instead of error
493        warn!("❌ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
494        
495        let empty_result = json!({
496            "query": query,
497            "market": market,
498            "status": "no_data_found",
499            "attempts": attempts,
500            "execution_id": execution_id,
501            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
502            "reason": "all_sources_failed"
503        });
504        
505        Ok(empty_result)
506    }
507
508    // Direct BrightData API method (existing implementation)
509    async fn try_fetch_url_direct_api(
510        &self, 
511        url: &str, 
512        query: &str, 
513        market: &str, 
514        source_name: &str, 
515        priority: crate::filters::strategy::QueryPriority,
516        token_budget: usize,
517        execution_id: &str,
518        sequence: u64,
519        method_sequence: u64
520    ) -> Result<Value, BrightDataError> {
521        let max_retries = env::var("MAX_RETRIES")
522            .ok()
523            .and_then(|s| s.parse::<u32>().ok())
524            .unwrap_or(1);
525        
526        let mut last_error = None;
527        
528        for retry_attempt in 0..max_retries {
529            let start_time = Instant::now();
530            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
531            
532            info!("🌐 Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
533                  source_name, attempt_id, retry_attempt + 1, max_retries);
534            
535            let api_token = env::var("BRIGHTDATA_API_TOKEN")
536                .or_else(|_| env::var("API_TOKEN"))
537                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
538
539            let base_url = env::var("BRIGHTDATA_BASE_URL")
540                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
541
542            let zone = env::var("WEB_UNLOCKER_ZONE")
543                .unwrap_or_else(|_| "mcp_unlocker".to_string());
544
545            let payload = json!({
546                "url": url,
547                "zone": zone,
548                "format": "raw",
549            });
550
551            let client = Client::builder()
552                .timeout(Duration::from_secs(90))
553                .build()
554                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
555
556            let response = client
557                .post(&format!("{}/request", base_url))
558                .header("Authorization", format!("Bearer {}", api_token))
559                .header("Content-Type", "application/json")
560                .json(&payload)
561                .send()
562                .await
563                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
564
565            let duration = start_time.elapsed();
566            let status = response.status().as_u16();
567            let response_headers: HashMap<String, String> = response
568                .headers()
569                .iter()
570                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
571                .collect();
572
573            let response_text = response.text().await
574                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
575
576            // Handle server errors with retry
577            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
578                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
579                warn!("⏳ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
580                tokio::time::sleep(wait_time).await;
581                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
582                continue;
583            }
584            
585            if !(200..300).contains(&status) {
586                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
587                                      &response_text[..response_text.len().min(500)]);
588                last_error = Some(BrightDataError::ToolError(error_msg));
589                if retry_attempt == max_retries - 1 {
590                    return Err(last_error.unwrap());
591                }
592                continue;
593            }
594
595            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
596            let raw_content = response_text;
597            let filtered_content = if self.is_data_reduction_enabled() {
598                // TODO: Add content filtering logic when DEDUCT_DATA=true
599                raw_content.clone()
600            } else {
601                raw_content.clone()
602            };
603
604            // Log metrics
605            if let Err(e) = BRIGHTDATA_METRICS.log_call(
606                &attempt_id,
607                url,
608                &zone,
609                "raw",
610                None,
611                payload.clone(),
612                status,
613                response_headers.clone(),
614                &raw_content,
615                Some(&filtered_content),
616                duration.as_millis() as u64,
617                None,
618                None,
619            ).await {
620                warn!("Failed to log direct API metrics: {}", e);
621            }
622
623            return Ok(json!({
624                "content": filtered_content,
625                "raw_content": raw_content,
626                "query": query,
627                "market": market,
628                "source": source_name,
629                "method": "Direct BrightData API",
630                "priority": format!("{:?}", priority),
631                "token_budget": token_budget,
632                "execution_id": execution_id,
633                "sequence": sequence,
634                "method_sequence": method_sequence,
635                "success": true,
636                "url": url,
637                "zone": zone,
638                "format": "raw",
639                "status_code": status,
640                "response_size_bytes": raw_content.len(),
641                "filtered_size_bytes": filtered_content.len(),
642                "duration_ms": duration.as_millis(),
643                "timestamp": chrono::Utc::now().to_rfc3339(),
644                "retry_attempts": retry_attempt + 1,
645                "max_retries": max_retries,
646                "payload_used": payload
647            }));
648        }
649
650        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
651    }
652
653    // Proxy-based method
654    async fn try_fetch_url_via_proxy(
655        &self, 
656        url: &str, 
657        query: &str, 
658        market: &str, 
659        source_name: &str, 
660        priority: crate::filters::strategy::QueryPriority,
661        token_budget: usize,
662        execution_id: &str,
663        sequence: u64,
664        method_sequence: u64
665    ) -> Result<Value, BrightDataError> {
666        let max_retries = env::var("MAX_RETRIES")
667            .ok()
668            .and_then(|s| s.parse::<u32>().ok())
669            .unwrap_or(1);
670        
671        let mut last_error = None;
672        
673        // Get proxy configuration from environment
674        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
675            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
676        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
677            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
678        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
679            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
680        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
681            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
682
683        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
684        
685        for retry_attempt in 0..max_retries {
686            let start_time = Instant::now();
687            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
688            
689            info!("🔄 Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
690                  source_name, attempt_id, retry_attempt + 1, max_retries);
691            
692            // Create client with proxy configuration
693            let proxy = reqwest::Proxy::all(&proxy_url)
694                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
695
696            let client = Client::builder()
697                .proxy(proxy)
698                .timeout(Duration::from_secs(90))
699                .danger_accept_invalid_certs(true) // Often needed for proxy connections
700                .build()
701                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
702
703            let response = client
704                .get(url)
705                .header("x-unblock-data-format", "markdown")
706                .send()
707                .await
708                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
709
710            let duration = start_time.elapsed();
711            let status = response.status().as_u16();
712            let response_headers: HashMap<String, String> = response
713                .headers()
714                .iter()
715                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
716                .collect();
717
718            let response_text = response.text().await
719                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
720
721            // Handle server errors with retry
722            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
723                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
724                warn!("Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
725                tokio::time::sleep(wait_time).await;
726                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
727                continue;
728            }
729            
730            if !(200..300).contains(&status) {
731                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
732                                      &response_text[..response_text.len().min(200)]);
733                
734                warn!("Proxy HTTP error: {}", error_msg);
735                last_error = Some(BrightDataError::ToolError(error_msg));
736                
737                // Log error metrics for proxy
738                let proxy_payload = json!({
739                    "url": url,
740                    "method": "proxy",
741                    "proxy_host": proxy_host,
742                    "proxy_port": proxy_port,
743                    "error": format!("HTTP {}", status)
744                });
745
746                if let Err(e) = BRIGHTDATA_METRICS.log_call(
747                    &attempt_id,
748                    url,
749                    "proxy",
750                    "raw",
751                    None,
752                    proxy_payload,
753                    status,
754                    response_headers.clone(),
755                    &response_text,
756                    Some(&format!("Proxy HTTP {} Error", status)),
757                    duration.as_millis() as u64,
758                    None,
759                    None,
760                ).await {
761                    warn!("Failed to log proxy error metrics: {}", e);
762                }
763                
764                if retry_attempt == max_retries - 1 {
765                    return Err(last_error.unwrap());
766                }
767                continue;
768            }
769
770            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
771            let raw_content = response_text;
772            let filtered_content = if self.is_data_reduction_enabled() {
773                // TODO: Add content filtering logic when DEDUCT_DATA=true
774                raw_content.clone()
775            } else {
776                raw_content.clone()
777            };
778
779            // Log metrics (using a simplified payload for proxy requests)
780            let proxy_payload = json!({
781                "url": url,
782                "method": "proxy",
783                "proxy_host": proxy_host,
784                "proxy_port": proxy_port
785            });
786
787            if let Err(e) = BRIGHTDATA_METRICS.log_call(
788                &attempt_id,
789                url,
790                "proxy",
791                "raw",
792                None,
793                proxy_payload.clone(),
794                status,
795                response_headers.clone(),
796                &raw_content,
797                Some(&filtered_content),
798                duration.as_millis() as u64,
799                None,
800                None,
801            ).await {
802                warn!("Failed to log proxy metrics: {}", e);
803            }
804
805            return Ok(json!({
806                "content": filtered_content,
807                "raw_content": raw_content,
808                "query": query,
809                "market": market,
810                "source": source_name,
811                "method": "BrightData Proxy",
812                "priority": format!("{:?}", priority),
813                "token_budget": token_budget,
814                "execution_id": execution_id,
815                "sequence": sequence,
816                "method_sequence": method_sequence,
817                "success": true,
818                "url": url,
819                "proxy_host": proxy_host,
820                "proxy_port": proxy_port,
821                "status_code": status,
822                "response_size_bytes": raw_content.len(),
823                "filtered_size_bytes": filtered_content.len(),
824                "duration_ms": duration.as_millis(),
825                "timestamp": chrono::Utc::now().to_rfc3339(),
826                "retry_attempts": retry_attempt + 1,
827                "max_retries": max_retries,
828                "payload_used": proxy_payload
829            }));
830        }
831
832        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
833    }
834
835    fn is_likely_etf_symbol(&self, query: &str) -> bool {
836        let clean = query.trim();
837        
838        if clean.len() < 1 || clean.len() > 15 {
839            return false;
840        }
841
842        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
843        let has_letters = clean.chars().any(|c| c.is_alphabetic());
844        
845        valid_chars && has_letters
846    }
847
848    /// Test both direct API and proxy connectivity
849    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
850        let test_url = "https://finance.yahoo.com/quote/BTC-USD/";
851        let mut results = Vec::new();
852        
853        // Test Direct API
854        info!("Testing Direct BrightData API...");
855        match self.try_fetch_url_direct_api(
856            test_url, "BTC", "usd", "Yahoo Finance Test", 
857            crate::filters::strategy::QueryPriority::High, 1000, 
858            "connectivity_test", 0, 0
859        ).await {
860            Ok(_) => {
861                results.push("Direct API: SUCCESS".to_string());
862            }
863            Err(e) => {
864                results.push(format!("Direct API: FAILED - {}", e));
865            }
866        }
867        
868        // Test Proxy
869        info!("Testing Proxy method...");
870        match self.try_fetch_url_via_proxy(
871            test_url, "BTC", "usd", "Yahoo Finance Test", 
872            crate::filters::strategy::QueryPriority::High, 1000, 
873            "connectivity_test", 0, 1
874        ).await {
875            Ok(_) => {
876                results.push("Proxy: SUCCESS".to_string());
877            }
878            Err(e) => {
879                results.push(format!("Proxy: FAILED - {}", e));
880            }
881        }
882        
883        Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
884    }
885}
886
887