snm_brightdata_client/tools/
stock.rs

1// src/tools/stock.rs - ENHANCED VERSION WITH DEDUCT_DATA SUPPORT AND METHOD-SEPARATED URLS
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use async_trait::async_trait;
8use reqwest::Client;
9use serde_json::{json, Value};
10use std::env;
11use std::time::{Duration, Instant};
12use std::collections::HashMap;
13use log::{info, warn, error};
14use crate::symbols::stock_symbol::match_symbol_from_query;
15
16// Struct to organize URLs by method
17#[derive(Debug, Clone)]
18pub struct MethodUrls {
19    pub proxy: Vec<(String, String)>,  // (url, description)
20    pub direct: Vec<(String, String)>, // (url, description)
21}
22
23pub struct StockDataTool;
24
25#[async_trait]
26impl Tool for StockDataTool {
27    fn name(&self) -> &str {
28        "get_stock_data"
29    }
30
31    fn description(&self) -> &str {
32        "Get comprehensive stock data including prices, performance, market cap, volumes with intelligent filtering and priority-based processing. Supports both direct BrightData API and proxy fallback."
33    }
34
35    fn input_schema(&self) -> Value {
36        json!({
37            "type": "object",
38            "properties": {
39                "query": {
40                    "type": "string",
41                    "description": "Stock symbol (e.g. TATAMOTORS, TCS, AAPL), company name, comparison query, or market overview request"
42                },
43                "market": {
44                    "type": "string",
45                    "enum": ["indian", "us", "global"],
46                    "default": "indian",
47                    "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
48                },
49                "data_type": {
50                    "type": "string",
51                    "enum": ["price", "fundamentals", "technical", "news", "all"],
52                    "default": "all",
53                    "description": "Type of stock data to focus on"
54                },
55                "timeframe": {
56                    "type": "string",
57                    "enum": ["realtime", "day", "week", "month", "quarter", "year"],
58                    "default": "realtime",
59                    "description": "Time period for stock data analysis"
60                },
61                "include_ratios": {
62                    "type": "boolean",
63                    "default": true,
64                    "description": "Include financial ratios like P/E, P/B, ROE"
65                },
66                "include_volume": {
67                    "type": "boolean",
68                    "default": true,
69                    "description": "Include trading volume and liquidity data"
70                }
71            },
72            "required": ["query"]
73        })
74    }
75
76    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
77        self.execute_internal(parameters).await
78    }
79
80    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
81        let raw_query = parameters
82            .get("query")
83            .and_then(|v| v.as_str())
84            .ok_or_else(|| BrightDataError::ToolError("Missing 'query' parameter".into()))?;
85
86        // Step 1: Resolve known symbols (or fallback)
87        let matched_symbol = match_symbol_from_query(raw_query);
88
89        // Step 2: Strip trailing .com / .xyz etc.
90        let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
91
92        let market = parameters
93            .get("market")
94            .and_then(|v| v.as_str())
95            .unwrap_or("indian");
96
97        let data_type = parameters
98            .get("data_type")
99            .and_then(|v| v.as_str())
100            .unwrap_or("all");
101
102        let timeframe = parameters
103            .get("timeframe")
104            .and_then(|v| v.as_str())
105            .unwrap_or("realtime");
106
107        let include_ratios = parameters
108            .get("include_ratios")
109            .and_then(|v| v.as_bool())
110            .unwrap_or(true);
111
112        let include_volume = parameters
113            .get("include_volume")
114            .and_then(|v| v.as_bool())
115            .unwrap_or(true);
116
117        let query_priority = ResponseStrategy::classify_query_priority(query);
118        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
119
120        let execution_id = format!("stock_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
121        
122        info!("๐Ÿ“ˆ Stock query: '{}' (market: {}, priority: {:?}, tokens: {})", 
123              query, market, query_priority, recommended_tokens);
124        
125        match self.fetch_stock_data_with_fallbacks_and_priority(
126            query, market, data_type, timeframe, include_ratios, include_volume,
127            query_priority, recommended_tokens, &execution_id
128        ).await {
129            Ok(result) => {
130                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
131                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
132                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
133                
134                // Create formatted response based on DEDUCT_DATA setting
135                let formatted_response = self.create_formatted_stock_response(
136                    query, market, content, source_used, method_used, 
137                    data_type, timeframe, include_ratios, include_volume, &execution_id
138                );
139                
140                let tool_result = ToolResult::success_with_raw(
141                    vec![McpContent::text(formatted_response)], 
142                    result
143                );
144                
145                // Apply filtering only if DEDUCT_DATA=true
146                if self.is_data_reduction_enabled() {
147                    Ok(ResponseStrategy::apply_size_limits(tool_result))
148                } else {
149                    Ok(tool_result)
150                }
151            }
152            Err(_e) => {
153                // Return empty data for BrightData errors - Anthropic will retry
154                warn!("BrightData error for query '{}', returning empty data for retry", query);
155                let empty_response = json!({
156                    "query": query,
157                    "market": market,
158                    "status": "no_data",
159                    "reason": "brightdata_error",
160                    "execution_id": execution_id
161                });
162                
163                Ok(ToolResult::success_with_raw(
164                    vec![McpContent::text("๐Ÿ“ˆ **No Data Available**\n\nPlease try again with a more specific stock symbol.".to_string())],
165                    empty_response
166                ))
167            }
168        }
169    }
170}
171
172impl StockDataTool {
173    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
174    fn is_data_reduction_enabled(&self) -> bool {
175        std::env::var("DEDUCT_DATA")
176            .unwrap_or_else(|_| "false".to_string())
177            .to_lowercase() == "true"
178    }
179
180    /// ENHANCED: Create formatted response with DEDUCT_DATA control
181    fn create_formatted_stock_response(
182        &self,
183        query: &str,
184        market: &str, 
185        content: &str,
186        source: &str,
187        method: &str,
188        data_type: &str,
189        timeframe: &str,
190        include_ratios: bool,
191        include_volume: bool,
192        execution_id: &str
193    ) -> String {
194        // If DEDUCT_DATA=false, return full content with basic formatting
195        if !self.is_data_reduction_enabled() {
196            return format!(
197                "๐Ÿ“ˆ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
198                query.to_uppercase(), 
199                market.to_uppercase(), 
200                content,
201                source, 
202                method, 
203                data_type, 
204                timeframe
205            );
206        }
207
208        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
209        // For now, return full content formatted
210        format!(
211            "๐Ÿ“ˆ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
212            query.to_uppercase(), 
213            market.to_uppercase(), 
214            content,
215            source, 
216            method, 
217            data_type, 
218            timeframe
219        )
220    }
221    
222    /// TODO: Extract essential stock data using existing filter methods
223    fn extract_essential_stock_data(&self, content: &str, query: &str) -> String {
224        // TODO: Add essential stock data extraction logic
225        // For now, return original content
226        content.to_string()
227    }
228    
229    /// TODO: Extract financial lines when filtering is disabled
230    fn extract_financial_lines(&self, content: &str) -> String {
231        // TODO: Add financial lines extraction logic
232        // For now, return original content
233        content.to_string()
234    }
235    
236    /// TODO: Format financial metrics into clean markdown
237    fn format_financial_metrics(&self, data: &str) -> String {
238        // TODO: Add financial metrics formatting logic
239        // For now, return data as-is
240        data.to_string()
241    }
242
243    /// ENHANCED: Build URLs separated by method (proxy vs direct)
244    fn build_prioritized_urls_with_priority(
245        &self, 
246        query: &str, 
247        market: &str, 
248        data_type: &str,
249        priority: crate::filters::strategy::QueryPriority
250    ) -> MethodUrls {
251        let mut proxy_urls = Vec::new();
252        let mut direct_urls = Vec::new();
253        let clean_query = query.trim().to_uppercase();
254
255        // TODO: Add priority-based URL limiting when DEDUCT_DATA=true
256        let max_sources = if self.is_data_reduction_enabled() {
257            // TODO: Add priority-based source limiting logic
258            5
259        } else {
260            5 // No limit when DEDUCT_DATA=false
261        };
262
263        if self.is_likely_stock_symbol(&clean_query) {
264            match market {
265                "indian" => {
266                    // TODO: Add priority-based URL selection logic
267                    let symbols_to_try = vec![
268                        format!("{}.NS", clean_query),
269                        format!("{}.BO", clean_query),
270                        clean_query.clone(),
271                    ];
272                    
273                    for (i, symbol) in symbols_to_try.iter().enumerate() {
274                        if i >= max_sources { break; }
275                        
276                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
277                        let description = format!("Yahoo Finance ({})", symbol);
278
279                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
280                        let proxy_description = format!("Yahoo Finance ({})", symbol);
281                        
282                        // Add to both proxy and direct (same URLs, different methods)
283                        proxy_urls.push((proxy_url, proxy_description));
284                        direct_urls.push((url, description));
285                    }
286                }
287                "us" => {
288                    let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
289                    let description = format!("Yahoo Finance ({})", clean_query);
290
291                    let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
292                    let proxy_description = format!("Yahoo Finance ({})", clean_query);
293                    
294                    proxy_urls.push((proxy_url, proxy_description));
295                    direct_urls.push((url, description));
296                }
297                "global" => {
298                    let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
299                    let description = format!("Yahoo Finance Global ({})", clean_query);
300
301                    let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
302                    let proxy_description = format!("Yahoo Finance Global ({})", clean_query);
303                    
304                    proxy_urls.push((proxy_url, proxy_description));
305                    direct_urls.push((url, description));
306                }
307                _ => {}
308            }
309        }
310
311        // Add search fallbacks (no restrictions when DEDUCT_DATA=false)
312        if proxy_urls.len() < max_sources {
313            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
314            let description = "Yahoo Finance Search".to_string();
315
316            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
317            let proxy_description = "Yahoo Finance Search".to_string();
318            
319            proxy_urls.push((proxy_url, proxy_description));
320            direct_urls.push((url, description));
321        }
322
323        info!("๐ŸŽฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
324              proxy_urls.len(), direct_urls.len(), query, priority);
325        
326        MethodUrls {
327            proxy: proxy_urls,
328            direct: direct_urls,
329        }
330    }
331
332    /// ENHANCED: Main fetch function with method-separated URL structure
333    async fn fetch_stock_data_with_fallbacks_and_priority(
334        &self, 
335        query: &str, 
336        market: &str, 
337        data_type: &str,
338        timeframe: &str,
339        include_ratios: bool,
340        include_volume: bool,
341        query_priority: crate::filters::strategy::QueryPriority,
342        token_budget: usize,
343        execution_id: &str
344    ) -> Result<Value, BrightDataError> {
345        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
346        let mut last_error = None;
347        let mut attempts = Vec::new();
348
349        // Define method priority: try direct first, then proxy
350        let methods_to_try = vec![
351            // ("direct", "Direct Call", &method_urls.direct),
352            ("proxy", "Proxy Fallback", &method_urls.proxy)
353        ];
354
355        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
356            info!("๐Ÿ”„ Trying {} method with {} URLs", method_name, urls_for_method.len());
357            
358            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
359                let attempt_result = match *method_type {
360                    "direct" => {
361                        info!("๐ŸŒ Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
362                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
363                        self.try_fetch_url_direct_api(
364                            url, query, market, source_name, query_priority, token_budget, 
365                            execution_id, url_sequence as u64, method_sequence as u64
366                        ).await
367                    }
368                    "proxy" => {
369                        info!("๐Ÿ”„ Trying Proxy method for {} (method: {}, url: {}/{})", 
370                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
371                        self.try_fetch_url_via_proxy(
372                            url, query, market, source_name, query_priority, token_budget, 
373                            execution_id, url_sequence as u64, method_sequence as u64
374                        ).await
375                    }
376                    _ => continue,
377                };
378
379                match attempt_result {
380                    Ok(mut result) => {
381                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
382                        
383                        attempts.push(json!({
384                            "source": source_name,
385                            "url": url,
386                            "method": method_name,
387                            "status": "success",
388                            "content_length": content.len(),
389                            "method_sequence": method_sequence + 1,
390                            "url_sequence": url_sequence + 1
391                        }));
392                        
393                        // TODO: Add content quality check when DEDUCT_DATA=true
394                        let should_try_next = if self.is_data_reduction_enabled() {
395                            // TODO: Add quality-based next source logic
396                            false
397                        } else {
398                            false
399                        };
400                        
401                        if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
402                            if url_sequence < urls_for_method.len() - 1 {
403                                warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
404                                continue; // Try next URL in same method
405                            } else {
406                                warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
407                                break; // Try next method
408                            }
409                        }
410                        
411                        // SUCCESS - but validate data quality first only if DEDUCT_DATA=true
412                        if self.is_data_reduction_enabled() {
413                            // TODO: Add data quality validation when DEDUCT_DATA=true
414                            // For now, accept all content
415                        }
416                        
417                        // SUCCESS
418                        result["source_used"] = json!(source_name);
419                        result["url_used"] = json!(url);
420                        result["method_used"] = json!(method_name);
421                        result["execution_id"] = json!(execution_id);
422                        result["priority"] = json!(format!("{:?}", query_priority));
423                        result["token_budget"] = json!(token_budget);
424                        result["attempts"] = json!(attempts);
425                        result["successful_method_sequence"] = json!(method_sequence + 1);
426                        result["successful_url_sequence"] = json!(url_sequence + 1);
427                        
428                        info!("โœ… Successfully fetched stock data from {} via {} (method: {}, url: {})", 
429                              source_name, method_name, method_sequence + 1, url_sequence + 1);
430                        
431                        return Ok(result);
432                    }
433                    Err(e) => {
434                        attempts.push(json!({
435                            "source": source_name,
436                            "url": url,
437                            "method": method_name,
438                            "status": "failed",
439                            "error": e.to_string(),
440                            "method_sequence": method_sequence + 1,
441                            "url_sequence": url_sequence + 1
442                        }));
443                        
444                        last_error = Some(e);
445                        warn!("โŒ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
446                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
447                    }
448                }
449            }
450        }
451
452        // All methods and sources failed - return empty data instead of error
453        warn!("โŒ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
454        
455        let empty_result = json!({
456            "query": query,
457            "market": market,
458            "status": "no_data_found",
459            "attempts": attempts,
460            "execution_id": execution_id,
461            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
462            "reason": "all_sources_failed"
463        });
464        
465        Ok(empty_result)
466    }
467
468    // Direct BrightData API method (existing implementation)
469    async fn try_fetch_url_direct_api(
470        &self, 
471        url: &str, 
472        query: &str, 
473        market: &str, 
474        source_name: &str, 
475        priority: crate::filters::strategy::QueryPriority,
476        token_budget: usize,
477        execution_id: &str,
478        sequence: u64,
479        method_sequence: u64
480    ) -> Result<Value, BrightDataError> {
481        let max_retries = env::var("MAX_RETRIES")
482            .ok()
483            .and_then(|s| s.parse::<u32>().ok())
484            .unwrap_or(1);
485        
486        let mut last_error = None;
487        
488        for retry_attempt in 0..max_retries {
489            let start_time = Instant::now();
490            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
491            
492            info!("๐ŸŒ Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
493                  source_name, attempt_id, retry_attempt + 1, max_retries);
494            
495            let api_token = env::var("BRIGHTDATA_API_TOKEN")
496                .or_else(|_| env::var("API_TOKEN"))
497                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
498
499            let base_url = env::var("BRIGHTDATA_BASE_URL")
500                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
501
502            let zone = env::var("WEB_UNLOCKER_ZONE")
503                .unwrap_or_else(|_| "mcp_unlocker".to_string());
504
505            let payload = json!({
506                "url": url,
507                "zone": zone,
508                "format": "raw",
509                // "data_format": "markdown"
510            });
511
512            if retry_attempt == 0 {
513                info!("๐Ÿ“ค Direct API Request:");
514                info!("   Endpoint: {}/request", base_url);
515                info!("   Zone: {}", zone);
516                info!("   Target: {}", url);
517            }
518
519            let client = Client::builder()
520                .timeout(Duration::from_secs(90))
521                .build()
522                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
523
524            let response = client
525                .post(&format!("{}/request", base_url))
526                .header("Authorization", format!("Bearer {}", api_token))
527                .header("Content-Type", "application/json")
528                .json(&payload)
529                .send()
530                .await
531                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
532
533            let duration = start_time.elapsed();
534            let status = response.status().as_u16();
535            let response_headers: HashMap<String, String> = response
536                .headers()
537                .iter()
538                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
539                .collect();
540
541            info!("๐Ÿ“ฅ Direct API Response (retry {}):", retry_attempt + 1);
542            info!("   Status: {}", status);
543            info!("   Duration: {}ms", duration.as_millis());
544
545            let response_text = response.text().await
546                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
547
548            // Handle server errors with retry
549            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
550                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
551                warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
552                tokio::time::sleep(wait_time).await;
553                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
554                continue;
555            }
556            
557            if !(200..300).contains(&status) {
558                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
559                                      &response_text[..response_text.len().min(500)]);
560                last_error = Some(BrightDataError::ToolError(error_msg));
561                if retry_attempt == max_retries - 1 {
562                    return Err(last_error.unwrap());
563                }
564                continue;
565            }
566
567            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
568            let raw_content = response_text;
569            let filtered_content = if self.is_data_reduction_enabled() {
570                // TODO: Add content filtering logic when DEDUCT_DATA=true
571                raw_content.clone()
572            } else {
573                raw_content.clone()
574            };
575
576            info!("๐Ÿ“Š Direct API: Content processed: {} bytes -> {} bytes", 
577                  raw_content.len(), filtered_content.len());
578
579            // Log metrics
580            if let Err(e) = BRIGHTDATA_METRICS.log_call(
581                &attempt_id,
582                url,
583                &zone,
584                "raw",
585                None,
586                payload.clone(),
587                status,
588                response_headers.clone(),
589                &raw_content,
590                Some(&filtered_content),
591                duration.as_millis() as u64,
592                None,
593                None,
594            ).await {
595                warn!("Failed to log direct API metrics: {}", e);
596            }
597
598            return Ok(json!({
599                "content": filtered_content,
600                "raw_content": raw_content,
601                "query": query,
602                "market": market,
603                "source": source_name,
604                "method": "Direct BrightData API",
605                "priority": format!("{:?}", priority),
606                "token_budget": token_budget,
607                "execution_id": execution_id,
608                "sequence": sequence,
609                "method_sequence": method_sequence,
610                "success": true,
611                "url": url,
612                "zone": zone,
613                "format": "raw",
614                "status_code": status,
615                "response_size_bytes": raw_content.len(),
616                "filtered_size_bytes": filtered_content.len(),
617                "duration_ms": duration.as_millis(),
618                "timestamp": chrono::Utc::now().to_rfc3339(),
619                "retry_attempts": retry_attempt + 1,
620                "max_retries": max_retries,
621                "payload_used": payload
622            }));
623        }
624
625        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
626    }
627
628    // Proxy-based method
629    async fn try_fetch_url_via_proxy(
630        &self, 
631        url: &str, 
632        query: &str, 
633        market: &str, 
634        source_name: &str, 
635        priority: crate::filters::strategy::QueryPriority,
636        token_budget: usize,
637        execution_id: &str,
638        sequence: u64,
639        method_sequence: u64
640    ) -> Result<Value, BrightDataError> {
641        let max_retries = env::var("MAX_RETRIES")
642            .ok()
643            .and_then(|s| s.parse::<u32>().ok())
644            .unwrap_or(1);
645        
646        let mut last_error = None;
647        
648        // Get proxy configuration from environment
649        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
650            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
651        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
652            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
653        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
654            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
655        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
656            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
657
658        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
659        
660        for retry_attempt in 0..max_retries {
661            let start_time = Instant::now();
662            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
663            
664            info!("๐Ÿ”„ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
665                  source_name, attempt_id, retry_attempt + 1, max_retries);
666            
667            if retry_attempt == 0 {
668                info!("๐Ÿ“ค Proxy Request:");
669                info!("   Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
670                info!("   Target: {}", url);
671            }
672
673            // Create client with proxy configuration
674            let proxy = reqwest::Proxy::all(&proxy_url)
675                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
676
677            let client = Client::builder()
678                .proxy(proxy)
679                .timeout(Duration::from_secs(90))
680                .danger_accept_invalid_certs(true) // Often needed for proxy connections
681                .build()
682                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
683
684            let response = client
685                .get(url)
686                .header("x-unblock-data-format", "markdown")
687                .send()
688                .await
689                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
690
691            let duration = start_time.elapsed();
692            let status = response.status().as_u16();
693            let response_headers: HashMap<String, String> = response
694                .headers()
695                .iter()
696                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
697                .collect();
698
699            info!("๐Ÿ“ฅ Proxy Response (retry {}):", retry_attempt + 1);
700            info!("   Status: {}", status);
701            info!("   Duration: {}ms", duration.as_millis());
702
703            let response_text = response.text().await
704                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
705
706            // Handle server errors with retry
707            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
708                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
709                warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
710                tokio::time::sleep(wait_time).await;
711                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
712                continue;
713            }
714            
715            if !(200..300).contains(&status) {
716                println!("-----------------------------------------------------------------");
717                println!("MARKDOWN SUCCESS: {:?}", status.clone());
718                println!("-----------------------------------------------------------------");
719                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
720                                      &response_text[..response_text.len().min(200)]);
721                
722                warn!("Proxy HTTP error: {}", error_msg);
723                last_error = Some(BrightDataError::ToolError(error_msg));
724                
725                // Log error metrics for proxy
726                let proxy_payload = json!({
727                    "url": url,
728                    "method": "proxy",
729                    "proxy_host": proxy_host,
730                    "proxy_port": proxy_port,
731                    "error": format!("HTTP {}", status)
732                });
733
734                if let Err(e) = BRIGHTDATA_METRICS.log_call(
735                    &attempt_id,
736                    url,
737                    "proxy",
738                    "raw",
739                    None,
740                    proxy_payload,
741                    status,
742                    response_headers.clone(),
743                    &response_text,
744                    Some(&format!("Proxy HTTP {} Error", status)),
745                    duration.as_millis() as u64,
746                    None,
747                    None,
748                ).await {
749                    warn!("Failed to log proxy error metrics: {}", e);
750                }
751                
752                if retry_attempt == max_retries - 1 {
753                    return Err(last_error.unwrap());
754                }
755                continue;
756            }
757
758            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
759            let raw_content = response_text;
760            let filtered_content = if self.is_data_reduction_enabled() {
761                // TODO: Add content filtering logic when DEDUCT_DATA=true
762                raw_content.clone()
763            } else {
764                raw_content.clone()
765            };
766
767            info!("๐Ÿ“Š Proxy: Content processed: {} bytes -> {} bytes", 
768                  raw_content.len(), filtered_content.len());
769
770            // Log metrics (using a simplified payload for proxy requests)
771            let proxy_payload = json!({
772                "url": url,
773                "method": "proxy",
774                "proxy_host": proxy_host,
775                "proxy_port": proxy_port
776            });
777
778            if let Err(e) = BRIGHTDATA_METRICS.log_call(
779                &attempt_id,
780                url,
781                "proxy",
782                "raw",
783                None,
784                proxy_payload.clone(),
785                status,
786                response_headers.clone(),
787                &raw_content,
788                Some(&filtered_content),
789                duration.as_millis() as u64,
790                None,
791                None,
792            ).await {
793                warn!("Failed to log proxy metrics: {}", e);
794            }
795
796            return Ok(json!({
797                "content": filtered_content,
798                "raw_content": raw_content,
799                "query": query,
800                "market": market,
801                "source": source_name,
802                "method": "BrightData Proxy",
803                "priority": format!("{:?}", priority),
804                "token_budget": token_budget,
805                "execution_id": execution_id,
806                "sequence": sequence,
807                "method_sequence": method_sequence,
808                "success": true,
809                "url": url,
810                "proxy_host": proxy_host,
811                "proxy_port": proxy_port,
812                "status_code": status,
813                "response_size_bytes": raw_content.len(),
814                "filtered_size_bytes": filtered_content.len(),
815                "duration_ms": duration.as_millis(),
816                "timestamp": chrono::Utc::now().to_rfc3339(),
817                "retry_attempts": retry_attempt + 1,
818                "max_retries": max_retries,
819                "payload_used": proxy_payload
820            }));
821        }
822
823        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
824    }
825
826    fn is_likely_stock_symbol(&self, query: &str) -> bool {
827        let clean = query.trim();
828        
829        if clean.len() < 1 || clean.len() > 15 {
830            return false;
831        }
832
833        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
834        let has_letters = clean.chars().any(|c| c.is_alphabetic());
835        
836        valid_chars && has_letters
837    }
838
839    /// Test both direct API and proxy connectivity
840    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
841        let test_url = "https://finance.yahoo.com/quote/AAPL/";
842        let mut results = Vec::new();
843        
844        // Test Direct API
845        info!("๐Ÿงช Testing Direct BrightData API...");
846        match self.try_fetch_url_direct_api(
847            test_url, "AAPL", "us", "Yahoo Finance Test", 
848            crate::filters::strategy::QueryPriority::High, 1000, 
849            "connectivity_test", 0, 0
850        ).await {
851            Ok(_) => {
852                results.push("โœ… Direct API: SUCCESS".to_string());
853            }
854            Err(e) => {
855                results.push(format!("โŒ Direct API: FAILED - {}", e));
856            }
857        }
858        
859        // Test Proxy
860        info!("๐Ÿงช Testing Proxy method...");
861        match self.try_fetch_url_via_proxy(
862            test_url, "AAPL", "us", "Yahoo Finance Test", 
863            crate::filters::strategy::QueryPriority::High, 1000, 
864            "connectivity_test", 0, 1
865        ).await {
866            Ok(_) => {
867                results.push("โœ… Proxy: SUCCESS".to_string());
868            }
869            Err(e) => {
870                results.push(format!("โŒ Proxy: FAILED - {}", e));
871            }
872        }
873        
874        Ok(format!("๐Ÿ” Connectivity Test Results:\n{}", results.join("\n")))
875    }
876}