snm_brightdata_client/tools/
index.rs

1// src/tools/index.rs
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::index_cache::get_index_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::index_symbol::match_symbol_from_query;
16
17// Struct to organize URLs by method
18#[derive(Debug, Clone)]
19pub struct MethodUrls {
20    pub proxy: Vec<(String, String)>,  // (url, description)
21    pub direct: Vec<(String, String)>, // (url, description)
22}
23
24pub struct IndexDataTool;
25
26#[async_trait]
27impl Tool for IndexDataTool {
28    fn name(&self) -> &str {
29        "get_indices_data"
30    }
31
32    fn description(&self) -> &str {
33        "Get stock index snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/^{INDEX_CODE}/ (e.g., ^NSEI)."
34    }
35
36    fn input_schema(&self) -> Value {
37        json!({
38            "type": "object",
39            "properties": {
40                "query": {
41                    "type": "string",
42                    "description": "Index code (e.g., ^NSEI, ^NSEBANK). Used if 'symbol' missing."
43                },
44                "symbol": {
45                    "type": "string",
46                    "description": "Index code (e.g., ^NSEI, ^NSEBANK). Used if 'symbol' missing."
47                },
48                "data_type": {
49                    "type": "string",
50                    "enum": ["price", "technical", "news", "all"],
51                    "default": "all",
52                    "description": "Which slice to emphasize"
53                },
54                "timeframe": {
55                    "type": "string",
56                    "enum": ["realtime", "day", "week", "month", "year"],
57                    "default": "realtime",
58                    "description": "Time period context for analysis"
59                },
60                "user_id": {
61                    "type": "string",
62                    "description": "Session/user id for cache scoping"
63                }
64            },
65            "required": ["symbol", "user_id"]
66        })
67    }
68
69
70    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71        self.execute_internal(parameters).await
72    }
73
74    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75        let raw_query = parameters
76            .get("symbol")
77            .and_then(|v| v.as_str())
78            .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
79
80        let session_id = parameters
81            .get("user_id")
82            .and_then(|v| v.as_str())
83            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85        // Step 1: Resolve known symbols (or fallback)
86        let matched_symbol = match_symbol_from_query(raw_query);
87
88        // Step 2: Strip trailing .com / .xyz etc.
89        let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
90
91        let market = parameters
92            .get("market")
93            .and_then(|v| v.as_str())
94            .unwrap_or("indian");
95
96        let data_type = parameters
97            .get("data_type")
98            .and_then(|v| v.as_str())
99            .unwrap_or("all");
100
101        let timeframe = parameters
102            .get("timeframe")
103            .and_then(|v| v.as_str())
104            .unwrap_or("realtime");
105
106        let include_ratios = parameters
107            .get("include_ratios")
108            .and_then(|v| v.as_bool())
109            .unwrap_or(true);
110
111        let include_volume = parameters
112            .get("include_volume")
113            .and_then(|v| v.as_bool())
114            .unwrap_or(true);
115
116        let query_priority = ResponseStrategy::classify_query_priority(query);
117        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
118
119        let execution_id = format!("index_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
120        
121        info!("๐Ÿ“ˆ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})", 
122              query, market, query_priority, recommended_tokens, session_id);
123        
124        // ๐ŸŽฏ CACHE CHECK - Check Redis cache first
125        match self.check_cache_first(query, session_id).await {
126            Ok(Some(cached_result)) => {
127                info!("๐Ÿš€ Cache HIT: Returning cached data for {} in session {}", query, session_id);
128                
129                // Create tool result from cached data
130                let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
131                let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
132                let method_used = "Redis Cache";
133                
134                let formatted_response = self.create_formatted_index_response(
135                    query, market, content, source_used, method_used, 
136                    data_type, timeframe, include_ratios, include_volume, &execution_id
137                );
138                
139                let tool_result = ToolResult::success_with_raw(
140                    vec![McpContent::text(formatted_response)], 
141                    cached_result
142                );
143                
144                // Apply filtering only if DEDUCT_DATA=true
145                if self.is_data_reduction_enabled() {
146                    return Ok(ResponseStrategy::apply_size_limits(tool_result));
147                } else {
148                    return Ok(tool_result);
149                }
150            }
151            Ok(None) => {
152                info!("๐Ÿ’พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
153            }
154            Err(e) => {
155                warn!("๐Ÿšจ Cache error (continuing with fresh fetch): {}", e);
156            }
157        }
158
159        // ๐ŸŒ FRESH FETCH - Cache miss, fetch from sources
160        match self.fetch_index_data_with_fallbacks_and_priority(
161            query, market, data_type, timeframe, include_ratios, include_volume,
162            query_priority, recommended_tokens, &execution_id
163        ).await {
164            Ok(result) => {
165                // ๐Ÿ—„๏ธ CACHE STORE - Store successful result in cache
166                if let Err(e) = self.store_in_cache(query, session_id, &result).await {
167                    warn!("Failed to store result in cache: {}", e);
168                }
169                
170                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
171                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
172                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
173                
174                // Create formatted response based on DEDUCT_DATA setting
175                let formatted_response = self.create_formatted_index_response(
176                    query, market, content, source_used, method_used, 
177                    data_type, timeframe, include_ratios, include_volume, &execution_id
178                );
179                
180                let tool_result = ToolResult::success_with_raw(
181                    vec![McpContent::text(formatted_response)], 
182                    result
183                );
184                
185                // Apply filtering only if DEDUCT_DATA=true
186                if self.is_data_reduction_enabled() {
187                    Ok(ResponseStrategy::apply_size_limits(tool_result))
188                } else {
189                    Ok(tool_result)
190                }
191            }
192            Err(_e) => {
193                // Return empty data for BrightData errors - Anthropic will retry
194                warn!("BrightData error for query '{}', returning empty data for retry", query);
195                let empty_response = json!({
196                    "query": query,
197                    "market": market,
198                    "status": "no_data",
199                    "reason": "brightdata_error",
200                    "execution_id": execution_id,
201                    "session_id": session_id
202                });
203                
204                Ok(ToolResult::success_with_raw(
205                    vec![McpContent::text("๐Ÿ“ˆ **No Data Available**\n\nPlease try again with a more specific index symbol.".to_string())],
206                    empty_response
207                ))
208            }
209        }
210    }
211}
212
213impl IndexDataTool {
214    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
215    fn is_data_reduction_enabled(&self) -> bool {
216        std::env::var("DEDUCT_DATA")
217            .unwrap_or_else(|_| "false".to_string())
218            .to_lowercase() == "true"
219    }
220
221    /// ENHANCED: Create formatted response with DEDUCT_DATA control
222    fn create_formatted_index_response(
223        &self,
224        query: &str,
225        market: &str, 
226        content: &str,
227        source: &str,
228        method: &str,
229        data_type: &str,
230        timeframe: &str,
231        include_ratios: bool,
232        include_volume: bool,
233        execution_id: &str
234    ) -> String {
235        // If DEDUCT_DATA=false, return full content with basic formatting
236        if !self.is_data_reduction_enabled() {
237            return format!(
238                "๐Ÿ“ˆ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
239                query.to_uppercase(), 
240                market.to_uppercase(), 
241                content,
242                source, 
243                method, 
244                data_type, 
245                timeframe
246            );
247        }
248
249        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
250        // For now, return full content formatted
251        format!(
252            "๐Ÿ“ˆ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
253            query.to_uppercase(), 
254            market.to_uppercase(), 
255            content,
256            source, 
257            method, 
258            data_type, 
259            timeframe
260        )
261    }
262    
263    /// TODO: Extract essential index data using existing filter methods
264    fn extract_essential_index_data(&self, content: &str, query: &str) -> String {
265        // TODO: Add essential index data extraction logic
266        // For now, return original content
267        content.to_string()
268    }
269    
270    /// TODO: Extract financial lines when filtering is disabled
271    fn extract_financial_lines(&self, content: &str) -> String {
272        // TODO: Add financial lines extraction logic
273        // For now, return original content
274        content.to_string()
275    }
276    
277    /// TODO: Format financial metrics into clean markdown
278    fn format_financial_metrics(&self, data: &str) -> String {
279        // TODO: Add financial metrics formatting logic
280        // For now, return data as-is
281        data.to_string()
282    }
283
284    // ๐ŸŽฏ ADDED: Check Redis cache first
285    async fn check_cache_first(
286        &self,
287        query: &str,
288        session_id: &str,
289    ) -> Result<Option<Value>, BrightDataError> {
290        let cache_service = get_index_cache().await?;
291        cache_service.get_cached_index_data(session_id, query).await
292    }
293
294    // ๐Ÿ—„๏ธ ADDED: Store successful result in Redis cache
295    async fn store_in_cache(
296        &self,
297        query: &str,
298        session_id: &str,
299        data: &Value,
300    ) -> Result<(), BrightDataError> {
301        let cache_service = get_index_cache().await?;
302        cache_service.cache_index_data(session_id, query, data.clone()).await
303    }
304
305    // ๐Ÿ” ADDED: Get all cached symbols for session (useful for comparisons)
306    pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
307        let cache_service = get_index_cache().await?;
308        cache_service.get_session_index_symbols(session_id).await
309    }
310
311    // ๐Ÿ—‘๏ธ ADDED: Clear cache for specific symbol
312    pub async fn clear_symbol_cache(
313        &self,
314        symbol: &str,
315        session_id: &str,
316    ) -> Result<(), BrightDataError> {
317        let cache_service = get_index_cache().await?;
318        cache_service.clear_index_symbol_cache(session_id, symbol).await
319    }
320
321    // ๐Ÿ—‘๏ธ ADDED: Clear entire session cache
322    pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
323        let cache_service = get_index_cache().await?;
324        cache_service.clear_session_index_cache(session_id).await
325    }
326
327    // ๐Ÿ“Š ADDED: Get cache statistics
328    pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
329        let cache_service = get_index_cache().await?;
330        cache_service.get_index_cache_stats().await
331    }
332
333    // ๐Ÿฅ ADDED: Enhanced connectivity test including cache
334    pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
335        let mut results = Vec::new();
336        
337        // Test cache connectivity
338        info!("๐Ÿงช Testing Redis Cache...");
339        match get_index_cache().await {
340            Ok(cache_service) => {
341                match cache_service.health_check().await {
342                    Ok(_) => results.push("โœ… Redis Cache: SUCCESS".to_string()),
343                    Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
344                }
345            }
346            Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
347        }
348        
349        // Test existing connectivity
350        let api_test = self.test_connectivity().await?;
351        results.push(api_test);
352        
353        Ok(format!("๐Ÿ” Enhanced Connectivity Test Results:\n{}", results.join("\n")))
354    }
355
356    /// ENHANCED: Build URLs separated by method (proxy vs direct)
357    fn build_prioritized_urls_with_priority(
358        &self, 
359        query: &str, 
360        market: &str, 
361        data_type: &str,
362        priority: crate::filters::strategy::QueryPriority
363    ) -> MethodUrls {
364        let mut proxy_urls = Vec::new();
365        let mut direct_urls = Vec::new();
366        let clean_query = query.trim().to_uppercase();
367
368        // Add priority-based URL limiting
369        let max_sources = 3;
370
371        if self.is_likely_index_symbol(&clean_query) {
372            match market {
373                "usd" => {
374                    let symbols_to_try = vec![
375                        format!("^{}", clean_query),
376                        clean_query.clone(),
377                    ];
378                    
379                    for (i, symbol) in symbols_to_try.iter().enumerate() {
380                        if i >= max_sources { break; }
381                        
382                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
383                        let description = format!("Yahoo Finance ({})", symbol);
384
385                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
386                        let proxy_description = format!("Yahoo Finance ({})", symbol);
387                        
388                        // Add to both proxy and direct (same URLs, different methods)
389                        proxy_urls.push((proxy_url, proxy_description));
390                        direct_urls.push((url, description));
391                    }
392                },
393                "inr" => {
394                    let symbols_to_try = vec![
395                        format!("^{}", clean_query),
396                        clean_query.clone(),
397                    ];
398                    
399                    for (i, symbol) in symbols_to_try.iter().enumerate() {
400                        if i >= max_sources { break; }
401                        
402                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
403                        let description = format!("Yahoo Finance ({})", symbol);
404
405                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
406                        let proxy_description = format!("Yahoo Finance ({})", symbol);
407                        
408                        // Add to both proxy and direct (same URLs, different methods)
409                        proxy_urls.push((proxy_url, proxy_description));
410                        direct_urls.push((url, description));
411                    }
412                }
413
414                _ => {
415                    let symbols_to_try = vec![
416                        format!("^{}", clean_query),
417                        clean_query.clone(),
418                    ];
419                    
420                    for (i, symbol) in symbols_to_try.iter().enumerate() {
421                        if i >= max_sources { break; }
422                        
423                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
424                        let description = format!("Yahoo Finance ({})", symbol);
425
426                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
427                        let proxy_description = format!("Yahoo Finance ({})", symbol);
428                        
429                        // Add to both proxy and direct (same URLs, different methods)
430                        proxy_urls.push((proxy_url, proxy_description));
431                        direct_urls.push((url, description));
432                    }
433                }
434
435            }
436        }
437
438        // Add search fallbacks (no restrictions when DEDUCT_DATA=false)
439        if proxy_urls.len() < max_sources {
440            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
441            let description = "Yahoo Finance Search".to_string();
442
443            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
444            let proxy_description = "Yahoo Finance Search".to_string();
445            
446            proxy_urls.push((proxy_url, proxy_description));
447            direct_urls.push((url, description));
448        }
449
450        info!("๐ŸŽฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
451              proxy_urls.len(), direct_urls.len(), query, priority);
452        
453        MethodUrls {
454            proxy: proxy_urls,
455            direct: direct_urls,
456        }
457    }
458
459    /// ENHANCED: Main fetch function with method-separated URL structure
460    async fn fetch_index_data_with_fallbacks_and_priority(
461        &self, 
462        query: &str, 
463        market: &str, 
464        data_type: &str,
465        timeframe: &str,
466        include_ratios: bool,
467        include_volume: bool,
468        query_priority: crate::filters::strategy::QueryPriority,
469        token_budget: usize,
470        execution_id: &str
471    ) -> Result<Value, BrightDataError> {
472        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
473        let mut last_error = None;
474        let mut attempts = Vec::new();
475
476        // Define method priority: try direct first, then proxy
477        let methods_to_try = vec![
478            // ("direct", "Direct Call", &method_urls.direct),
479            ("proxy", "Proxy Fallback", &method_urls.proxy)
480        ];
481
482        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
483            info!("๐Ÿ”„ Trying {} method with {} URLs", method_name, urls_for_method.len());
484            
485            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
486                let attempt_result = match *method_type {
487                    "direct" => {
488                        info!("๐ŸŒ Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
489                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
490                        self.try_fetch_url_direct_api(
491                            url, query, market, source_name, query_priority, token_budget, 
492                            execution_id, url_sequence as u64, method_sequence as u64
493                        ).await
494                    }
495                    "proxy" => {
496                        info!("๐Ÿ”„ Trying Proxy method for {} (method: {}, url: {}/{})", 
497                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
498                        self.try_fetch_url_via_proxy(
499                            url, query, market, source_name, query_priority, token_budget, 
500                            execution_id, url_sequence as u64, method_sequence as u64
501                        ).await
502                    }
503                    _ => continue,
504                };
505
506                match attempt_result {
507                    Ok(mut result) => {
508                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
509                        
510                        attempts.push(json!({
511                            "source": source_name,
512                            "url": url,
513                            "method": method_name,
514                            "status": "success",
515                            "content_length": content.len(),
516                            "method_sequence": method_sequence + 1,
517                            "url_sequence": url_sequence + 1
518                        }));
519                        
520                        // TODO: Add content quality check when DEDUCT_DATA=true
521                        let should_try_next = if self.is_data_reduction_enabled() {
522                            // TODO: Add quality-based next source logic
523                            false
524                        } else {
525                            false
526                        };
527                        
528                        if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
529                            if url_sequence < urls_for_method.len() - 1 {
530                                warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
531                                continue; // Try next URL in same method
532                            } else {
533                                warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
534                                break; // Try next method
535                            }
536                        }
537                        
538                        // SUCCESS - but validate data quality first only if DEDUCT_DATA=true
539                        if self.is_data_reduction_enabled() {
540                            // TODO: Add data quality validation when DEDUCT_DATA=true
541                            // For now, accept all content
542                        }
543                        
544                        // SUCCESS
545                        result["source_used"] = json!(source_name);
546                        result["url_used"] = json!(url);
547                        result["method_used"] = json!(method_name);
548                        result["execution_id"] = json!(execution_id);
549                        result["priority"] = json!(format!("{:?}", query_priority));
550                        result["token_budget"] = json!(token_budget);
551                        result["attempts"] = json!(attempts);
552                        result["successful_method_sequence"] = json!(method_sequence + 1);
553                        result["successful_url_sequence"] = json!(url_sequence + 1);
554                        
555                        info!("โœ… Successfully fetched index data from {} via {} (method: {}, url: {})", 
556                              source_name, method_name, method_sequence + 1, url_sequence + 1);
557                        
558                        return Ok(result);
559                    }
560                    Err(e) => {
561                        attempts.push(json!({
562                            "source": source_name,
563                            "url": url,
564                            "method": method_name,
565                            "status": "failed",
566                            "error": e.to_string(),
567                            "method_sequence": method_sequence + 1,
568                            "url_sequence": url_sequence + 1
569                        }));
570                        
571                        last_error = Some(e);
572                        warn!("โŒ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
573                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
574                    }
575                }
576            }
577        }
578
579        // All methods and sources failed - return empty data instead of error
580        warn!("โŒ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
581        
582        let empty_result = json!({
583            "query": query,
584            "market": market,
585            "status": "no_data_found",
586            "attempts": attempts,
587            "execution_id": execution_id,
588            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
589            "reason": "all_sources_failed"
590        });
591        
592        Ok(empty_result)
593    }
594
595    // Direct BrightData API method (existing implementation)
596    async fn try_fetch_url_direct_api(
597        &self, 
598        url: &str, 
599        query: &str, 
600        market: &str, 
601        source_name: &str, 
602        priority: crate::filters::strategy::QueryPriority,
603        token_budget: usize,
604        execution_id: &str,
605        sequence: u64,
606        method_sequence: u64
607    ) -> Result<Value, BrightDataError> {
608        let max_retries = env::var("MAX_RETRIES")
609            .ok()
610            .and_then(|s| s.parse::<u32>().ok())
611            .unwrap_or(1);
612        
613        let mut last_error = None;
614        
615        for retry_attempt in 0..max_retries {
616            let start_time = Instant::now();
617            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
618            
619            info!("๐ŸŒ Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
620                  source_name, attempt_id, retry_attempt + 1, max_retries);
621            
622            let api_token = env::var("BRIGHTDATA_API_TOKEN")
623                .or_else(|_| env::var("API_TOKEN"))
624                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
625
626            let base_url = env::var("BRIGHTDATA_BASE_URL")
627                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
628
629            let zone = env::var("WEB_UNLOCKER_ZONE")
630                .unwrap_or_else(|_| "mcp_unlocker".to_string());
631
632            let payload = json!({
633                "url": url,
634                "zone": zone,
635                "format": "raw",
636                // "data_format": "markdown"
637            });
638
639            if retry_attempt == 0 {
640                info!("๐Ÿ“ค Direct API Request:");
641                info!("   Endpoint: {}/request", base_url);
642                info!("   Zone: {}", zone);
643                info!("   Target: {}", url);
644            }
645
646            let client = Client::builder()
647                .timeout(Duration::from_secs(90))
648                .build()
649                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
650
651            let response = client
652                .post(&format!("{}/request", base_url))
653                .header("Authorization", format!("Bearer {}", api_token))
654                .header("Content-Type", "application/json")
655                .json(&payload)
656                .send()
657                .await
658                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
659
660            let duration = start_time.elapsed();
661            let status = response.status().as_u16();
662            let response_headers: HashMap<String, String> = response
663                .headers()
664                .iter()
665                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
666                .collect();
667
668            info!("๐Ÿ“ฅ Direct API Response (retry {}):", retry_attempt + 1);
669            info!("   Status: {}", status);
670            info!("   Duration: {}ms", duration.as_millis());
671
672            let response_text = response.text().await
673                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
674
675            // Handle server errors with retry
676            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
677                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
678                warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
679                tokio::time::sleep(wait_time).await;
680                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
681                continue;
682            }
683            
684            if !(200..300).contains(&status) {
685                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
686                                      &response_text[..response_text.len().min(500)]);
687                last_error = Some(BrightDataError::ToolError(error_msg));
688                if retry_attempt == max_retries - 1 {
689                    return Err(last_error.unwrap());
690                }
691                continue;
692            }
693
694            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
695            let raw_content = response_text;
696            let filtered_content = if self.is_data_reduction_enabled() {
697                // TODO: Add content filtering logic when DEDUCT_DATA=true
698                raw_content.clone()
699            } else {
700                raw_content.clone()
701            };
702
703            info!("๐Ÿ“Š Direct API: Content processed: {} bytes -> {} bytes", 
704                  raw_content.len(), filtered_content.len());
705
706            // Log metrics
707            if let Err(e) = BRIGHTDATA_METRICS.log_call(
708                &attempt_id,
709                url,
710                &zone,
711                "raw",
712                None,
713                payload.clone(),
714                status,
715                response_headers.clone(),
716                &raw_content,
717                Some(&filtered_content),
718                duration.as_millis() as u64,
719                None,
720                None,
721            ).await {
722                warn!("Failed to log direct API metrics: {}", e);
723            }
724
725            return Ok(json!({
726                "content": filtered_content,
727                "raw_content": raw_content,
728                "query": query,
729                "market": market,
730                "source": source_name,
731                "method": "Direct BrightData API",
732                "priority": format!("{:?}", priority),
733                "token_budget": token_budget,
734                "execution_id": execution_id,
735                "sequence": sequence,
736                "method_sequence": method_sequence,
737                "success": true,
738                "url": url,
739                "zone": zone,
740                "format": "raw",
741                "status_code": status,
742                "response_size_bytes": raw_content.len(),
743                "filtered_size_bytes": filtered_content.len(),
744                "duration_ms": duration.as_millis(),
745                "timestamp": chrono::Utc::now().to_rfc3339(),
746                "retry_attempts": retry_attempt + 1,
747                "max_retries": max_retries,
748                "payload_used": payload
749            }));
750        }
751
752        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
753    }
754
755    // Proxy-based method
756    async fn try_fetch_url_via_proxy(
757        &self, 
758        url: &str, 
759        query: &str, 
760        market: &str, 
761        source_name: &str, 
762        priority: crate::filters::strategy::QueryPriority,
763        token_budget: usize,
764        execution_id: &str,
765        sequence: u64,
766        method_sequence: u64
767    ) -> Result<Value, BrightDataError> {
768        let max_retries = env::var("MAX_RETRIES")
769            .ok()
770            .and_then(|s| s.parse::<u32>().ok())
771            .unwrap_or(1);
772        
773        let mut last_error = None;
774        
775        // Get proxy configuration from environment
776        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
777            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
778        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
779            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
780        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
781            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
782        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
783            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
784
785        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
786        
787        for retry_attempt in 0..max_retries {
788            let start_time = Instant::now();
789            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
790            
791            info!("๐Ÿ”„ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
792                  source_name, attempt_id, retry_attempt + 1, max_retries);
793            
794            if retry_attempt == 0 {
795                info!("๐Ÿ“ค Proxy Request:");
796                info!("   Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
797                info!("   Target: {}", url);
798            }
799
800            // Create client with proxy configuration
801            let proxy = reqwest::Proxy::all(&proxy_url)
802                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
803
804            let client = Client::builder()
805                .proxy(proxy)
806                .timeout(Duration::from_secs(90))
807                .danger_accept_invalid_certs(true) // Often needed for proxy connections
808                .build()
809                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
810
811            let response = client
812                .get(url)
813                .header("x-unblock-data-format", "markdown")
814                .send()
815                .await
816                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
817
818            let duration = start_time.elapsed();
819            let status = response.status().as_u16();
820            let response_headers: HashMap<String, String> = response
821                .headers()
822                .iter()
823                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
824                .collect();
825
826            info!("๐Ÿ“ฅ Proxy Response (retry {}):", retry_attempt + 1);
827            info!("   Status: {}", status);
828            info!("   Duration: {}ms", duration.as_millis());
829
830            let response_text = response.text().await
831                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
832
833            // Handle server errors with retry
834            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
835                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
836                warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
837                tokio::time::sleep(wait_time).await;
838                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
839                continue;
840            }
841            
842            if !(200..300).contains(&status) {
843                println!("-----------------------------------------------------------------");
844                println!("MARKDOWN SUCCESS: {:?}", status.clone());
845                println!("-----------------------------------------------------------------");
846                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
847                                      &response_text[..response_text.len().min(200)]);
848                
849                warn!("Proxy HTTP error: {}", error_msg);
850                last_error = Some(BrightDataError::ToolError(error_msg));
851                
852                // Log error metrics for proxy
853                let proxy_payload = json!({
854                    "url": url,
855                    "method": "proxy",
856                    "proxy_host": proxy_host,
857                    "proxy_port": proxy_port,
858                    "error": format!("HTTP {}", status)
859                });
860
861                if let Err(e) = BRIGHTDATA_METRICS.log_call(
862                    &attempt_id,
863                    url,
864                    "proxy",
865                    "raw",
866                    None,
867                    proxy_payload,
868                    status,
869                    response_headers.clone(),
870                    &response_text,
871                    Some(&format!("Proxy HTTP {} Error", status)),
872                    duration.as_millis() as u64,
873                    None,
874                    None,
875                ).await {
876                    warn!("Failed to log proxy error metrics: {}", e);
877                }
878                
879                if retry_attempt == max_retries - 1 {
880                    return Err(last_error.unwrap());
881                }
882                continue;
883            }
884
885            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
886            let raw_content = response_text;
887            let filtered_content = if self.is_data_reduction_enabled() {
888                // TODO: Add content filtering logic when DEDUCT_DATA=true
889                raw_content.clone()
890            } else {
891                raw_content.clone()
892            };
893
894            info!("๐Ÿ“Š Proxy: Content processed: {} bytes -> {} bytes", 
895                  raw_content.len(), filtered_content.len());
896
897            // Log metrics (using a simplified payload for proxy requests)
898            let proxy_payload = json!({
899                "url": url,
900                "method": "proxy",
901                "proxy_host": proxy_host,
902                "proxy_port": proxy_port
903            });
904
905            if let Err(e) = BRIGHTDATA_METRICS.log_call(
906                &attempt_id,
907                url,
908                "proxy",
909                "raw",
910                None,
911                proxy_payload.clone(),
912                status,
913                response_headers.clone(),
914                &raw_content,
915                Some(&filtered_content),
916                duration.as_millis() as u64,
917                None,
918                None,
919            ).await {
920                warn!("Failed to log proxy metrics: {}", e);
921            }
922
923            return Ok(json!({
924                "content": filtered_content,
925                "raw_content": raw_content,
926                "query": query,
927                "market": market,
928                "source": source_name,
929                "method": "BrightData Proxy",
930                "priority": format!("{:?}", priority),
931                "token_budget": token_budget,
932                "execution_id": execution_id,
933                "sequence": sequence,
934                "method_sequence": method_sequence,
935                "success": true,
936                "url": url,
937                "proxy_host": proxy_host,
938                "proxy_port": proxy_port,
939                "status_code": status,
940                "response_size_bytes": raw_content.len(),
941                "filtered_size_bytes": filtered_content.len(),
942                "duration_ms": duration.as_millis(),
943                "timestamp": chrono::Utc::now().to_rfc3339(),
944                "retry_attempts": retry_attempt + 1,
945                "max_retries": max_retries,
946                "payload_used": proxy_payload
947            }));
948        }
949
950        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
951    }
952
953    fn is_likely_index_symbol(&self, query: &str) -> bool {
954        let clean = query.trim();
955        
956        if clean.len() < 1 || clean.len() > 15 {
957            return false;
958        }
959
960        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
961        let has_letters = clean.chars().any(|c| c.is_alphabetic());
962        
963        valid_chars && has_letters
964    }
965
966    // ----------------- Pair helpers -----------------
967    fn normalize_pair(raw: &str) -> String {
968        // Accept "usd/inr", "usd-inr", "USDINR", "USDINR=X" -> return "USDINR" (without =X here)
969        let s = raw.trim().to_uppercase().replace(['/', '-', ' '], "");
970        s.trim().to_string()
971    }
972
973    /// Test both direct API and proxy connectivity
974    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
975        let test_url = "https://finance.yahoo.com/quote/USDINR=X/";
976        let mut results = Vec::new();
977        
978        // Test Direct API
979        info!("Testing Direct BrightData API...");
980        match self.try_fetch_url_direct_api(
981            test_url, "BTC", "usd", "Yahoo Finance Test", 
982            crate::filters::strategy::QueryPriority::High, 1000, 
983            "connectivity_test", 0, 0
984        ).await {
985            Ok(_) => {
986                results.push("Direct API: SUCCESS".to_string());
987            }
988            Err(e) => {
989                results.push(format!("Direct API: FAILED - {}", e));
990            }
991        }
992        
993        // Test Proxy
994        info!("Testing Proxy method...");
995        match self.try_fetch_url_via_proxy(
996            test_url, "BTC", "usd", "Yahoo Finance Test", 
997            crate::filters::strategy::QueryPriority::High, 1000, 
998            "connectivity_test", 0, 1
999        ).await {
1000            Ok(_) => {
1001                results.push("Proxy: SUCCESS".to_string());
1002            }
1003            Err(e) => {
1004                results.push(format!("Proxy: FAILED - {}", e));
1005            }
1006        }
1007        
1008        Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
1009    }
1010}