snm_brightdata_client/tools/
stock.rs

1// src/tools/stock.rs - ENHANCED VERSION WITH REDIS CACHE AND DEDUCT_DATA SUPPORT
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::stock_cache::get_stock_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::stock_symbol::match_symbol_from_query;
16
17// Struct to organize URLs by method
18#[derive(Debug, Clone)]
19pub struct MethodUrls {
20    pub proxy: Vec<(String, String)>,  // (url, description)
21    pub direct: Vec<(String, String)>, // (url, description)
22}
23
24pub struct StockDataTool;
25
26#[async_trait]
27impl Tool for StockDataTool {
28    fn name(&self) -> &str {
29        "get_stock_data"
30    }
31
32    fn description(&self) -> &str {
33        "Get comprehensive stock data including prices, performance, market cap, volumes with intelligent filtering and priority-based processing. Supports both direct BrightData API and proxy fallback."
34    }
35
36    fn input_schema(&self) -> Value {
37        json!({
38            "type": "object",
39            "properties": {
40                "query": {
41                    "type": "string",
42                    "description": "Stock symbol (e.g. TATAMOTORS, TCS, AAPL), company name, comparison query, or market overview request"
43                },
44                "symbol": {
45                    "type": "string",
46                    "description": "Stock symbol (e.g. TATAMOTORS, TCS, AAPL), company name, comparison query, or market overview request"
47                },
48                "market": {
49                    "type": "string",
50                    "enum": ["indian", "us", "global"],
51                    "default": "indian",
52                    "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
53                },
54                "data_type": {
55                    "type": "string",
56                    "enum": ["price", "fundamentals", "technical", "news", "all"],
57                    "default": "all",
58                    "description": "Type of stock data to focus on"
59                },
60                "timeframe": {
61                    "type": "string",
62                    "enum": ["realtime", "day", "week", "month", "quarter", "year"],
63                    "default": "realtime",
64                    "description": "Time period for stock data analysis"
65                },
66                "include_ratios": {
67                    "type": "boolean",
68                    "default": true,
69                    "description": "Include financial ratios like P/E, P/B, ROE"
70                },
71                "include_volume": {
72                    "type": "boolean",
73                    "default": true,
74                    "description": "Include trading volume and liquidity data"
75                },
76                "session_id": {
77                    "type": "string",
78                    "description": "Session identifier for caching (optional, will use default if not provided)"
79                },
80            },
81            "required": ["query"]
82        })
83    }
84
85    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
86        self.execute_internal(parameters).await
87    }
88
89    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
90        let raw_query = parameters
91            .get("symbol")
92            .and_then(|v| v.as_str())
93            .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
94
95        let session_id = parameters
96            .get("user_id")
97            .and_then(|v| v.as_str())
98            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
99
100        // Step 1: Resolve known symbols (or fallback)
101        let matched_symbol = match_symbol_from_query(raw_query);
102
103        // Step 2: Strip trailing .com / .xyz etc.
104        let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
105
106        let market = parameters
107            .get("market")
108            .and_then(|v| v.as_str())
109            .unwrap_or("indian");
110
111        let data_type = parameters
112            .get("data_type")
113            .and_then(|v| v.as_str())
114            .unwrap_or("all");
115
116        let timeframe = parameters
117            .get("timeframe")
118            .and_then(|v| v.as_str())
119            .unwrap_or("realtime");
120
121        let include_ratios = parameters
122            .get("include_ratios")
123            .and_then(|v| v.as_bool())
124            .unwrap_or(true);
125
126        let include_volume = parameters
127            .get("include_volume")
128            .and_then(|v| v.as_bool())
129            .unwrap_or(true);
130
131        let query_priority = ResponseStrategy::classify_query_priority(query);
132        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
133
134        let execution_id = format!("stock_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
135        
136        info!("๐Ÿ“ˆ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})", 
137              query, market, query_priority, recommended_tokens, session_id);
138        
139        // ๐ŸŽฏ CACHE CHECK - Check Redis cache first
140        match self.check_cache_first(query, session_id).await {
141            Ok(Some(cached_result)) => {
142                info!("๐Ÿš€ Cache HIT: Returning cached data for {} in session {}", query, session_id);
143                
144                // Create tool result from cached data
145                let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
146                let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
147                let method_used = "Redis Cache";
148                
149                let formatted_response = self.create_formatted_stock_response(
150                    query, market, content, source_used, method_used, 
151                    data_type, timeframe, include_ratios, include_volume, &execution_id
152                );
153                
154                let tool_result = ToolResult::success_with_raw(
155                    vec![McpContent::text(formatted_response)], 
156                    cached_result
157                );
158                
159                // Apply filtering only if DEDUCT_DATA=true
160                if self.is_data_reduction_enabled() {
161                    return Ok(ResponseStrategy::apply_size_limits(tool_result));
162                } else {
163                    return Ok(tool_result);
164                }
165            }
166            Ok(None) => {
167                info!("๐Ÿ’พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
168            }
169            Err(e) => {
170                warn!("๐Ÿšจ Cache error (continuing with fresh fetch): {}", e);
171            }
172        }
173
174        // ๐ŸŒ FRESH FETCH - Cache miss, fetch from sources
175        match self.fetch_stock_data_with_fallbacks_and_priority(
176            query, market, data_type, timeframe, include_ratios, include_volume,
177            query_priority, recommended_tokens, &execution_id
178        ).await {
179            Ok(result) => {
180                // ๐Ÿ—„๏ธ CACHE STORE - Store successful result in cache
181                if let Err(e) = self.store_in_cache(query, session_id, &result).await {
182                    warn!("Failed to store result in cache: {}", e);
183                }
184                
185                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
186                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
187                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
188                
189                // Create formatted response based on DEDUCT_DATA setting
190                let formatted_response = self.create_formatted_stock_response(
191                    query, market, content, source_used, method_used, 
192                    data_type, timeframe, include_ratios, include_volume, &execution_id
193                );
194                
195                let tool_result = ToolResult::success_with_raw(
196                    vec![McpContent::text(formatted_response)], 
197                    result
198                );
199                
200                // Apply filtering only if DEDUCT_DATA=true
201                if self.is_data_reduction_enabled() {
202                    Ok(ResponseStrategy::apply_size_limits(tool_result))
203                } else {
204                    Ok(tool_result)
205                }
206            }
207            Err(_e) => {
208                // Return empty data for BrightData errors - Anthropic will retry
209                warn!("BrightData error for query '{}', returning empty data for retry", query);
210                let empty_response = json!({
211                    "query": query,
212                    "market": market,
213                    "status": "no_data",
214                    "reason": "brightdata_error",
215                    "execution_id": execution_id,
216                    "session_id": session_id
217                });
218                
219                Ok(ToolResult::success_with_raw(
220                    vec![McpContent::text("๐Ÿ“ˆ **No Data Available**\n\nPlease try again with a more specific stock symbol.".to_string())],
221                    empty_response
222                ))
223            }
224        }
225    }
226}
227
228impl StockDataTool {
229    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
230    fn is_data_reduction_enabled(&self) -> bool {
231        std::env::var("DEDUCT_DATA")
232            .unwrap_or_else(|_| "false".to_string())
233            .to_lowercase() == "true"
234    }
235
236    /// ENHANCED: Create formatted response with DEDUCT_DATA control
237    fn create_formatted_stock_response(
238        &self,
239        query: &str,
240        market: &str, 
241        content: &str,
242        source: &str,
243        method: &str,
244        data_type: &str,
245        timeframe: &str,
246        include_ratios: bool,
247        include_volume: bool,
248        execution_id: &str
249    ) -> String {
250        // If DEDUCT_DATA=false, return full content with basic formatting
251        if !self.is_data_reduction_enabled() {
252            return format!(
253                "๐Ÿ“ˆ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
254                query.to_uppercase(), 
255                market.to_uppercase(), 
256                content,
257                source, 
258                method, 
259                data_type, 
260                timeframe
261            );
262        }
263
264        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
265        // For now, return full content formatted
266        format!(
267            "๐Ÿ“ˆ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
268            query.to_uppercase(), 
269            market.to_uppercase(), 
270            content,
271            source, 
272            method, 
273            data_type, 
274            timeframe
275        )
276    }
277    
278    /// TODO: Extract essential stock data using existing filter methods
279    fn extract_essential_stock_data(&self, content: &str, query: &str) -> String {
280        // TODO: Add essential stock data extraction logic
281        // For now, return original content
282        content.to_string()
283    }
284    
285    /// TODO: Extract financial lines when filtering is disabled
286    fn extract_financial_lines(&self, content: &str) -> String {
287        // TODO: Add financial lines extraction logic
288        // For now, return original content
289        content.to_string()
290    }
291    
292    /// TODO: Format financial metrics into clean markdown
293    fn format_financial_metrics(&self, data: &str) -> String {
294        // TODO: Add financial metrics formatting logic
295        // For now, return data as-is
296        data.to_string()
297    }
298
299    // ๐ŸŽฏ ADDED: Check Redis cache first
300    async fn check_cache_first(
301        &self,
302        query: &str,
303        session_id: &str,
304    ) -> Result<Option<Value>, BrightDataError> {
305        let cache_service = get_stock_cache().await?;
306        cache_service.get_cached_stock_data(session_id, query).await
307    }
308
309    // ๐Ÿ—„๏ธ ADDED: Store successful result in Redis cache
310    async fn store_in_cache(
311        &self,
312        query: &str,
313        session_id: &str,
314        data: &Value,
315    ) -> Result<(), BrightDataError> {
316        let cache_service = get_stock_cache().await?;
317        cache_service.cache_stock_data(session_id, query, data.clone()).await
318    }
319
320    // ๐Ÿ” ADDED: Get all cached symbols for session (useful for comparisons)
321    pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
322        let cache_service = get_stock_cache().await?;
323        cache_service.get_session_stock_symbols(session_id).await
324    }
325
326    // ๐Ÿ—‘๏ธ ADDED: Clear cache for specific symbol
327    pub async fn clear_symbol_cache(
328        &self,
329        symbol: &str,
330        session_id: &str,
331    ) -> Result<(), BrightDataError> {
332        let cache_service = get_stock_cache().await?;
333        cache_service.clear_stock_symbol_cache(session_id, symbol).await
334    }
335
336    // ๐Ÿ—‘๏ธ ADDED: Clear entire session cache
337    pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
338        let cache_service = get_stock_cache().await?;
339        cache_service.clear_session_stock_cache(session_id).await
340    }
341
342    // ๐Ÿ“Š ADDED: Get cache statistics
343    pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
344        let cache_service = get_stock_cache().await?;
345        cache_service.get_stock_cache_stats().await
346    }
347
348    // ๐Ÿฅ ADDED: Enhanced connectivity test including cache
349    pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
350        let mut results = Vec::new();
351        
352        // Test cache connectivity
353        info!("๐Ÿงช Testing Redis Cache...");
354        match get_stock_cache().await {
355            Ok(cache_service) => {
356                match cache_service.health_check().await {
357                    Ok(_) => results.push("โœ… Redis Cache: SUCCESS".to_string()),
358                    Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
359                }
360            }
361            Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
362        }
363        
364        // Test existing connectivity
365        let api_test = self.test_connectivity().await?;
366        results.push(api_test);
367        
368        Ok(format!("๐Ÿ” Enhanced Connectivity Test Results:\n{}", results.join("\n")))
369    }
370
371    /// ENHANCED: Build URLs separated by method (proxy vs direct)
372    fn build_prioritized_urls_with_priority(
373        &self, 
374        query: &str, 
375        market: &str, 
376        data_type: &str,
377        priority: crate::filters::strategy::QueryPriority
378    ) -> MethodUrls {
379        let mut proxy_urls = Vec::new();
380        let mut direct_urls = Vec::new();
381        let clean_query = query.trim().to_uppercase();
382
383        // Add priority-based URL limiting
384        let max_sources = 3;
385
386        if self.is_likely_stock_symbol(&clean_query) {
387            match market {
388                "indian" => {
389                    // TODO: Add priority-based URL selection logic
390                    let symbols_to_try = vec![
391                        format!("{}.NS", clean_query),
392                        format!("{}.BO", clean_query),
393                        clean_query.clone(),
394                    ];
395                    
396                    for (i, symbol) in symbols_to_try.iter().enumerate() {
397                        if i >= max_sources { break; }
398                        
399                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
400                        let description = format!("Yahoo Finance ({})", symbol);
401
402                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
403                        let proxy_description = format!("Yahoo Finance ({})", symbol);
404                        
405                        // Add to both proxy and direct (same URLs, different methods)
406                        proxy_urls.push((proxy_url, proxy_description));
407                        direct_urls.push((url, description));
408                    }
409                }
410                "us" => {
411                    let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
412                    let description = format!("Yahoo Finance ({})", clean_query);
413
414                    let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
415                    let proxy_description = format!("Yahoo Finance ({})", clean_query);
416                    
417                    proxy_urls.push((proxy_url, proxy_description));
418                    direct_urls.push((url, description));
419                }
420                "global" => {
421                    let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
422                    let description = format!("Yahoo Finance Global ({})", clean_query);
423
424                    let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
425                    let proxy_description = format!("Yahoo Finance Global ({})", clean_query);
426                    
427                    proxy_urls.push((proxy_url, proxy_description));
428                    direct_urls.push((url, description));
429                }
430                _ => {}
431            }
432        }
433
434        // Add search fallbacks (no restrictions when DEDUCT_DATA=false)
435        if proxy_urls.len() < max_sources {
436            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
437            let description = "Yahoo Finance Search".to_string();
438
439            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
440            let proxy_description = "Yahoo Finance Search".to_string();
441            
442            proxy_urls.push((proxy_url, proxy_description));
443            direct_urls.push((url, description));
444        }
445
446        info!("๐ŸŽฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
447              proxy_urls.len(), direct_urls.len(), query, priority);
448        
449        MethodUrls {
450            proxy: proxy_urls,
451            direct: direct_urls,
452        }
453    }
454
455    /// ENHANCED: Main fetch function with method-separated URL structure
456    async fn fetch_stock_data_with_fallbacks_and_priority(
457        &self, 
458        query: &str, 
459        market: &str, 
460        data_type: &str,
461        timeframe: &str,
462        include_ratios: bool,
463        include_volume: bool,
464        query_priority: crate::filters::strategy::QueryPriority,
465        token_budget: usize,
466        execution_id: &str
467    ) -> Result<Value, BrightDataError> {
468        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
469        let mut last_error = None;
470        let mut attempts = Vec::new();
471
472        // Define method priority: try direct first, then proxy
473        let methods_to_try = vec![
474            // ("direct", "Direct Call", &method_urls.direct),
475            ("proxy", "Proxy Fallback", &method_urls.proxy)
476        ];
477
478        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
479            info!("๐Ÿ”„ Trying {} method with {} URLs", method_name, urls_for_method.len());
480            
481            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
482                let attempt_result = match *method_type {
483                    "direct" => {
484                        info!("๐ŸŒ Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
485                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
486                        self.try_fetch_url_direct_api(
487                            url, query, market, source_name, query_priority, token_budget, 
488                            execution_id, url_sequence as u64, method_sequence as u64
489                        ).await
490                    }
491                    "proxy" => {
492                        info!("๐Ÿ”„ Trying Proxy method for {} (method: {}, url: {}/{})", 
493                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
494                        self.try_fetch_url_via_proxy(
495                            url, query, market, source_name, query_priority, token_budget, 
496                            execution_id, url_sequence as u64, method_sequence as u64
497                        ).await
498                    }
499                    _ => continue,
500                };
501
502                match attempt_result {
503                    Ok(mut result) => {
504                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
505                        
506                        attempts.push(json!({
507                            "source": source_name,
508                            "url": url,
509                            "method": method_name,
510                            "status": "success",
511                            "content_length": content.len(),
512                            "method_sequence": method_sequence + 1,
513                            "url_sequence": url_sequence + 1
514                        }));
515                        
516                        // TODO: Add content quality check when DEDUCT_DATA=true
517                        let should_try_next = if self.is_data_reduction_enabled() {
518                            // TODO: Add quality-based next source logic
519                            false
520                        } else {
521                            false
522                        };
523                        
524                        if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
525                            if url_sequence < urls_for_method.len() - 1 {
526                                warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
527                                continue; // Try next URL in same method
528                            } else {
529                                warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
530                                break; // Try next method
531                            }
532                        }
533                        
534                        // SUCCESS - but validate data quality first only if DEDUCT_DATA=true
535                        if self.is_data_reduction_enabled() {
536                            // TODO: Add data quality validation when DEDUCT_DATA=true
537                            // For now, accept all content
538                        }
539                        
540                        // SUCCESS
541                        result["source_used"] = json!(source_name);
542                        result["url_used"] = json!(url);
543                        result["method_used"] = json!(method_name);
544                        result["execution_id"] = json!(execution_id);
545                        result["priority"] = json!(format!("{:?}", query_priority));
546                        result["token_budget"] = json!(token_budget);
547                        result["attempts"] = json!(attempts);
548                        result["successful_method_sequence"] = json!(method_sequence + 1);
549                        result["successful_url_sequence"] = json!(url_sequence + 1);
550                        
551                        info!("โœ… Successfully fetched stock data from {} via {} (method: {}, url: {})", 
552                              source_name, method_name, method_sequence + 1, url_sequence + 1);
553                        
554                        return Ok(result);
555                    }
556                    Err(e) => {
557                        attempts.push(json!({
558                            "source": source_name,
559                            "url": url,
560                            "method": method_name,
561                            "status": "failed",
562                            "error": e.to_string(),
563                            "method_sequence": method_sequence + 1,
564                            "url_sequence": url_sequence + 1
565                        }));
566                        
567                        last_error = Some(e);
568                        warn!("โŒ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
569                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
570                    }
571                }
572            }
573        }
574
575        // All methods and sources failed - return empty data instead of error
576        warn!("โŒ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
577        
578        let empty_result = json!({
579            "query": query,
580            "market": market,
581            "status": "no_data_found",
582            "attempts": attempts,
583            "execution_id": execution_id,
584            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
585            "reason": "all_sources_failed"
586        });
587        
588        Ok(empty_result)
589    }
590
591    // Direct BrightData API method (existing implementation)
592    async fn try_fetch_url_direct_api(
593        &self, 
594        url: &str, 
595        query: &str, 
596        market: &str, 
597        source_name: &str, 
598        priority: crate::filters::strategy::QueryPriority,
599        token_budget: usize,
600        execution_id: &str,
601        sequence: u64,
602        method_sequence: u64
603    ) -> Result<Value, BrightDataError> {
604        let max_retries = env::var("MAX_RETRIES")
605            .ok()
606            .and_then(|s| s.parse::<u32>().ok())
607            .unwrap_or(1);
608        
609        let mut last_error = None;
610        
611        for retry_attempt in 0..max_retries {
612            let start_time = Instant::now();
613            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
614            
615            info!("๐ŸŒ Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
616                  source_name, attempt_id, retry_attempt + 1, max_retries);
617            
618            let api_token = env::var("BRIGHTDATA_API_TOKEN")
619                .or_else(|_| env::var("API_TOKEN"))
620                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
621
622            let base_url = env::var("BRIGHTDATA_BASE_URL")
623                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
624
625            let zone = env::var("WEB_UNLOCKER_ZONE")
626                .unwrap_or_else(|_| "mcp_unlocker".to_string());
627
628            let payload = json!({
629                "url": url,
630                "zone": zone,
631                "format": "raw",
632                // "data_format": "markdown"
633            });
634
635            if retry_attempt == 0 {
636                info!("๐Ÿ“ค Direct API Request:");
637                info!("   Endpoint: {}/request", base_url);
638                info!("   Zone: {}", zone);
639                info!("   Target: {}", url);
640            }
641
642            let client = Client::builder()
643                .timeout(Duration::from_secs(90))
644                .build()
645                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
646
647            let response = client
648                .post(&format!("{}/request", base_url))
649                .header("Authorization", format!("Bearer {}", api_token))
650                .header("Content-Type", "application/json")
651                .json(&payload)
652                .send()
653                .await
654                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
655
656            let duration = start_time.elapsed();
657            let status = response.status().as_u16();
658            let response_headers: HashMap<String, String> = response
659                .headers()
660                .iter()
661                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
662                .collect();
663
664            info!("๐Ÿ“ฅ Direct API Response (retry {}):", retry_attempt + 1);
665            info!("   Status: {}", status);
666            info!("   Duration: {}ms", duration.as_millis());
667
668            let response_text = response.text().await
669                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
670
671            // Handle server errors with retry
672            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
673                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
674                warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
675                tokio::time::sleep(wait_time).await;
676                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
677                continue;
678            }
679            
680            if !(200..300).contains(&status) {
681                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
682                                      &response_text[..response_text.len().min(500)]);
683                last_error = Some(BrightDataError::ToolError(error_msg));
684                if retry_attempt == max_retries - 1 {
685                    return Err(last_error.unwrap());
686                }
687                continue;
688            }
689
690            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
691            let raw_content = response_text;
692            let filtered_content = if self.is_data_reduction_enabled() {
693                // TODO: Add content filtering logic when DEDUCT_DATA=true
694                raw_content.clone()
695            } else {
696                raw_content.clone()
697            };
698
699            info!("๐Ÿ“Š Direct API: Content processed: {} bytes -> {} bytes", 
700                  raw_content.len(), filtered_content.len());
701
702            // Log metrics
703            if let Err(e) = BRIGHTDATA_METRICS.log_call(
704                &attempt_id,
705                url,
706                &zone,
707                "raw",
708                None,
709                payload.clone(),
710                status,
711                response_headers.clone(),
712                &raw_content,
713                Some(&filtered_content),
714                duration.as_millis() as u64,
715                None,
716                None,
717            ).await {
718                warn!("Failed to log direct API metrics: {}", e);
719            }
720
721            return Ok(json!({
722                "content": filtered_content,
723                "raw_content": raw_content,
724                "query": query,
725                "market": market,
726                "source": source_name,
727                "method": "Direct BrightData API",
728                "priority": format!("{:?}", priority),
729                "token_budget": token_budget,
730                "execution_id": execution_id,
731                "sequence": sequence,
732                "method_sequence": method_sequence,
733                "success": true,
734                "url": url,
735                "zone": zone,
736                "format": "raw",
737                "status_code": status,
738                "response_size_bytes": raw_content.len(),
739                "filtered_size_bytes": filtered_content.len(),
740                "duration_ms": duration.as_millis(),
741                "timestamp": chrono::Utc::now().to_rfc3339(),
742                "retry_attempts": retry_attempt + 1,
743                "max_retries": max_retries,
744                "payload_used": payload
745            }));
746        }
747
748        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
749    }
750
751    // Proxy-based method
752    async fn try_fetch_url_via_proxy(
753        &self, 
754        url: &str, 
755        query: &str, 
756        market: &str, 
757        source_name: &str, 
758        priority: crate::filters::strategy::QueryPriority,
759        token_budget: usize,
760        execution_id: &str,
761        sequence: u64,
762        method_sequence: u64
763    ) -> Result<Value, BrightDataError> {
764        let max_retries = env::var("MAX_RETRIES")
765            .ok()
766            .and_then(|s| s.parse::<u32>().ok())
767            .unwrap_or(1);
768        
769        let mut last_error = None;
770        
771        // Get proxy configuration from environment
772        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
773            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
774        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
775            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
776        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
777            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
778        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
779            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
780
781        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
782        
783        for retry_attempt in 0..max_retries {
784            let start_time = Instant::now();
785            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
786            
787            info!("๐Ÿ”„ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
788                  source_name, attempt_id, retry_attempt + 1, max_retries);
789            
790            if retry_attempt == 0 {
791                info!("๐Ÿ“ค Proxy Request:");
792                info!("   Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
793                info!("   Target: {}", url);
794            }
795
796            // Create client with proxy configuration
797            let proxy = reqwest::Proxy::all(&proxy_url)
798                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
799
800            let client = Client::builder()
801                .proxy(proxy)
802                .timeout(Duration::from_secs(90))
803                .danger_accept_invalid_certs(true) // Often needed for proxy connections
804                .build()
805                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
806
807            let response = client
808                .get(url)
809                .header("x-unblock-data-format", "markdown")
810                .send()
811                .await
812                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
813
814            let duration = start_time.elapsed();
815            let status = response.status().as_u16();
816            let response_headers: HashMap<String, String> = response
817                .headers()
818                .iter()
819                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
820                .collect();
821
822            info!("๐Ÿ“ฅ Proxy Response (retry {}):", retry_attempt + 1);
823            info!("   Status: {}", status);
824            info!("   Duration: {}ms", duration.as_millis());
825
826            let response_text = response.text().await
827                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
828
829            // Handle server errors with retry
830            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
831                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
832                warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
833                tokio::time::sleep(wait_time).await;
834                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
835                continue;
836            }
837            
838            if !(200..300).contains(&status) {
839                println!("-----------------------------------------------------------------");
840                println!("MARKDOWN SUCCESS: {:?}", status.clone());
841                println!("-----------------------------------------------------------------");
842                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
843                                      &response_text[..response_text.len().min(200)]);
844                
845                warn!("Proxy HTTP error: {}", error_msg);
846                last_error = Some(BrightDataError::ToolError(error_msg));
847                
848                // Log error metrics for proxy
849                let proxy_payload = json!({
850                    "url": url,
851                    "method": "proxy",
852                    "proxy_host": proxy_host,
853                    "proxy_port": proxy_port,
854                    "error": format!("HTTP {}", status)
855                });
856
857                if let Err(e) = BRIGHTDATA_METRICS.log_call(
858                    &attempt_id,
859                    url,
860                    "proxy",
861                    "raw",
862                    None,
863                    proxy_payload,
864                    status,
865                    response_headers.clone(),
866                    &response_text,
867                    Some(&format!("Proxy HTTP {} Error", status)),
868                    duration.as_millis() as u64,
869                    None,
870                    None,
871                ).await {
872                    warn!("Failed to log proxy error metrics: {}", e);
873                }
874                
875                if retry_attempt == max_retries - 1 {
876                    return Err(last_error.unwrap());
877                }
878                continue;
879            }
880
881            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
882            let raw_content = response_text;
883            let filtered_content = if self.is_data_reduction_enabled() {
884                // TODO: Add content filtering logic when DEDUCT_DATA=true
885                raw_content.clone()
886            } else {
887                raw_content.clone()
888            };
889
890            info!("๐Ÿ“Š Proxy: Content processed: {} bytes -> {} bytes", 
891                  raw_content.len(), filtered_content.len());
892
893            // Log metrics (using a simplified payload for proxy requests)
894            let proxy_payload = json!({
895                "url": url,
896                "method": "proxy",
897                "proxy_host": proxy_host,
898                "proxy_port": proxy_port
899            });
900
901            if let Err(e) = BRIGHTDATA_METRICS.log_call(
902                &attempt_id,
903                url,
904                "proxy",
905                "raw",
906                None,
907                proxy_payload.clone(),
908                status,
909                response_headers.clone(),
910                &raw_content,
911                Some(&filtered_content),
912                duration.as_millis() as u64,
913                None,
914                None,
915            ).await {
916                warn!("Failed to log proxy metrics: {}", e);
917            }
918
919            return Ok(json!({
920                "content": filtered_content,
921                "raw_content": raw_content,
922                "query": query,
923                "market": market,
924                "source": source_name,
925                "method": "BrightData Proxy",
926                "priority": format!("{:?}", priority),
927                "token_budget": token_budget,
928                "execution_id": execution_id,
929                "sequence": sequence,
930                "method_sequence": method_sequence,
931                "success": true,
932                "url": url,
933                "proxy_host": proxy_host,
934                "proxy_port": proxy_port,
935                "status_code": status,
936                "response_size_bytes": raw_content.len(),
937                "filtered_size_bytes": filtered_content.len(),
938                "duration_ms": duration.as_millis(),
939                "timestamp": chrono::Utc::now().to_rfc3339(),
940                "retry_attempts": retry_attempt + 1,
941                "max_retries": max_retries,
942                "payload_used": proxy_payload
943            }));
944        }
945
946        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
947    }
948
949    fn is_likely_stock_symbol(&self, query: &str) -> bool {
950        let clean = query.trim();
951        
952        if clean.len() < 1 || clean.len() > 15 {
953            return false;
954        }
955
956        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
957        let has_letters = clean.chars().any(|c| c.is_alphabetic());
958        
959        valid_chars && has_letters
960    }
961
962    /// Test both direct API and proxy connectivity
963    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
964        let test_url = "https://finance.yahoo.com/quote/AAPL/";
965        let mut results = Vec::new();
966        
967        // Test Direct API
968        info!("๐Ÿงช Testing Direct BrightData API...");
969        match self.try_fetch_url_direct_api(
970            test_url, "AAPL", "us", "Yahoo Finance Test", 
971            crate::filters::strategy::QueryPriority::High, 1000, 
972            "connectivity_test", 0, 0
973        ).await {
974            Ok(_) => {
975                results.push("โœ… Direct API: SUCCESS".to_string());
976            }
977            Err(e) => {
978                results.push(format!("โŒ Direct API: FAILED - {}", e));
979            }
980        }
981        
982        // Test Proxy
983        info!("๐Ÿงช Testing Proxy method...");
984        match self.try_fetch_url_via_proxy(
985            test_url, "AAPL", "us", "Yahoo Finance Test", 
986            crate::filters::strategy::QueryPriority::High, 1000, 
987            "connectivity_test", 0, 1
988        ).await {
989            Ok(_) => {
990                results.push("โœ… Proxy: SUCCESS".to_string());
991            }
992            Err(e) => {
993                results.push(format!("โŒ Proxy: FAILED - {}", e));
994            }
995        }
996        
997        Ok(format!("๐Ÿ” Connectivity Test Results:\n{}", results.join("\n")))
998    }
999}