snm_brightdata_client/tools/
bond.rs

1// src/tools/bond.rs
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::bond_cache::get_bond_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::bond_symbol::match_symbol_from_query;
16
17// Struct to organize URLs by method
18#[derive(Debug, Clone)]
19pub struct MethodUrls {
20    pub proxy: Vec<(String, String)>,  // (url, description)
21    pub direct: Vec<(String, String)>, // (url, description)
22}
23
24pub struct BondDataTool;
25
26#[async_trait]
27impl Tool for BondDataTool {
28    fn name(&self) -> &str {
29        "get_bond_data"
30    }
31
32    fn description(&self) -> &str {
33        "Get bond/fund snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/^{SYMBOL}/ (e.g., ^TNX, ^IRX)."
34    }
35
36    fn input_schema(&self) -> Value {
37        json!({
38            "type": "object",
39            "properties": {
40                "query": {
41                    "type": "string",
42                    "description": "Bond symbol (e.g., ^TNX, ^IRX, ^TYX, ^FVX). Used if 'symbol' missing.",
43                },
44                "symbol": {
45                    "type": "string",
46                    "description": "Bond symbol (e.g., ^TNX, ^IRX, ^TYX, ^FVX). Used if 'symbol' missing.",
47                },
48                "data_type": {
49                    "type": "string",
50                    "enum": ["price", "technical", "news", "all"],
51                    "default": "all",
52                    "description": "Focus slice for parsing/formatting"
53                },
54                "timeframe": {
55                    "type": "string",
56                    "enum": ["realtime", "day", "week", "month", "year"],
57                    "default": "realtime",
58                    "description": "Time period context for analysis"
59                },
60                "user_id": {
61                    "type": "string",
62                    "description": "Session/user id for cache scoping"
63                }
64            },
65            "required": ["symbol", "user_id"]
66        })
67    }
68
69
70    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71        self.execute_internal(parameters).await
72    }
73
74    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75        let raw_query = parameters
76            .get("symbol")
77            .and_then(|v| v.as_str())
78            .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
79
80        let session_id = parameters
81            .get("user_id")
82            .and_then(|v| v.as_str())
83            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85        // Step 1: Resolve known symbols (or fallback)
86        let matched_symbol = match_symbol_from_query(raw_query);
87
88        // Step 2: Handle different symbol types properly
89        let query = if matched_symbol.contains('.') {
90            // International symbols like TIGG.L - preserve as-is
91            matched_symbol.as_str()
92        } else if matched_symbol.starts_with('^') {
93            // Bond symbols like ^TNX - preserve as-is
94            matched_symbol.as_str()
95        } else {
96            // Regular symbols - strip .com/.xyz suffixes if present
97            matched_symbol.split('.').next().unwrap_or(&matched_symbol)
98        };
99
100        let market = parameters
101            .get("market")
102            .and_then(|v| v.as_str())
103            .unwrap_or("indian");
104
105        let data_type = parameters
106            .get("data_type")
107            .and_then(|v| v.as_str())
108            .unwrap_or("all");
109
110        let timeframe = parameters
111            .get("timeframe")
112            .and_then(|v| v.as_str())
113            .unwrap_or("realtime");
114
115        let include_ratios = parameters
116            .get("include_ratios")
117            .and_then(|v| v.as_bool())
118            .unwrap_or(true);
119
120        let include_volume = parameters
121            .get("include_volume")
122            .and_then(|v| v.as_bool())
123            .unwrap_or(true);
124
125        let query_priority = ResponseStrategy::classify_query_priority(query);
126        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
127
128        let execution_id = format!("bond_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
129        
130        info!("๐Ÿ“ˆ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})", 
131              query, market, query_priority, recommended_tokens, session_id);
132        
133        // ๐ŸŽฏ CACHE CHECK - Check Redis cache first
134        match self.check_cache_first(query, session_id).await {
135            Ok(Some(cached_result)) => {
136                info!("๐Ÿš€ Cache HIT: Returning cached data for {} in session {}", query, session_id);
137                
138                // Create tool result from cached data
139                let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
140                let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
141                let method_used = "Redis Cache";
142                
143                let formatted_response = self.create_formatted_bond_response(
144                    query, market, content, source_used, method_used, 
145                    data_type, timeframe, include_ratios, include_volume, &execution_id
146                );
147                
148                let tool_result = ToolResult::success_with_raw(
149                    vec![McpContent::text(formatted_response)], 
150                    cached_result
151                );
152                
153                // Apply filtering only if DEDUCT_DATA=true
154                if self.is_data_reduction_enabled() {
155                    return Ok(ResponseStrategy::apply_size_limits(tool_result));
156                } else {
157                    return Ok(tool_result);
158                }
159            }
160            Ok(None) => {
161                info!("๐Ÿ’พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
162            }
163            Err(e) => {
164                warn!("๐Ÿšจ Cache error (continuing with fresh fetch): {}", e);
165            }
166        }
167
168        // ๐ŸŒ FRESH FETCH - Cache miss, fetch from sources
169        match self.fetch_bond_data_with_fallbacks_and_priority(
170            query, market, data_type, timeframe, include_ratios, include_volume,
171            query_priority, recommended_tokens, &execution_id
172        ).await {
173            Ok(result) => {
174                // ๐Ÿ—„๏ธ CACHE STORE - Store successful result in cache
175                if let Err(e) = self.store_in_cache(query, session_id, &result).await {
176                    warn!("Failed to store result in cache: {}", e);
177                }
178                
179                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
180                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
181                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
182                
183                // Create formatted response based on DEDUCT_DATA setting
184                let formatted_response = self.create_formatted_bond_response(
185                    query, market, content, source_used, method_used, 
186                    data_type, timeframe, include_ratios, include_volume, &execution_id
187                );
188                
189                let tool_result = ToolResult::success_with_raw(
190                    vec![McpContent::text(formatted_response)], 
191                    result
192                );
193                
194                // Apply filtering only if DEDUCT_DATA=true
195                if self.is_data_reduction_enabled() {
196                    Ok(ResponseStrategy::apply_size_limits(tool_result))
197                } else {
198                    Ok(tool_result)
199                }
200            }
201            Err(_e) => {
202                // Return empty data for BrightData errors - Anthropic will retry
203                warn!("BrightData error for query '{}', returning empty data for retry", query);
204                let empty_response = json!({
205                    "query": query,
206                    "market": market,
207                    "status": "no_data",
208                    "reason": "brightdata_error",
209                    "execution_id": execution_id,
210                    "session_id": session_id
211                });
212                
213                Ok(ToolResult::success_with_raw(
214                    vec![McpContent::text("๐Ÿ“ˆ **No Data Available**\n\nPlease try again with a more specific bond symbol.".to_string())],
215                    empty_response
216                ))
217            }
218        }
219    }
220}
221
222impl BondDataTool {
223    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
224    fn is_data_reduction_enabled(&self) -> bool {
225        std::env::var("DEDUCT_DATA")
226            .unwrap_or_else(|_| "false".to_string())
227            .to_lowercase() == "true"
228    }
229
230    /// ENHANCED: Create formatted response with DEDUCT_DATA control
231    fn create_formatted_bond_response(
232        &self,
233        query: &str,
234        market: &str, 
235        content: &str,
236        source: &str,
237        method: &str,
238        data_type: &str,
239        timeframe: &str,
240        include_ratios: bool,
241        include_volume: bool,
242        execution_id: &str
243    ) -> String {
244        // If DEDUCT_DATA=false, return full content with basic formatting
245        if !self.is_data_reduction_enabled() {
246            return format!(
247                "๐Ÿ“ˆ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
248                query.to_uppercase(), 
249                market.to_uppercase(), 
250                content,
251                source, 
252                method, 
253                data_type, 
254                timeframe
255            );
256        }
257
258        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
259        // For now, return full content formatted
260        format!(
261            "๐Ÿ“ˆ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โ€ข Type: {} โ€ข Period: {}*",
262            query.to_uppercase(), 
263            market.to_uppercase(), 
264            content,
265            source, 
266            method, 
267            data_type, 
268            timeframe
269        )
270    }
271    
272    /// TODO: Extract essential bond data using existing filter methods
273    fn extract_essential_bond_data(&self, content: &str, query: &str) -> String {
274        // TODO: Add essential bond data extraction logic
275        // For now, return original content
276        content.to_string()
277    }
278    
279    /// TODO: Extract financial lines when filtering is disabled
280    fn extract_financial_lines(&self, content: &str) -> String {
281        // TODO: Add financial lines extraction logic
282        // For now, return original content
283        content.to_string()
284    }
285    
286    /// TODO: Format financial metrics into clean markdown
287    fn format_financial_metrics(&self, data: &str) -> String {
288        // TODO: Add financial metrics formatting logic
289        // For now, return data as-is
290        data.to_string()
291    }
292
293    // ๐ŸŽฏ ADDED: Check Redis cache first
294    async fn check_cache_first(
295        &self,
296        query: &str,
297        session_id: &str,
298    ) -> Result<Option<Value>, BrightDataError> {
299        let cache_service = get_bond_cache().await?;
300        cache_service.get_cached_bond_data(session_id, query).await
301    }
302
303    // ๐Ÿ—„๏ธ ADDED: Store successful result in Redis cache
304    async fn store_in_cache(
305        &self,
306        query: &str,
307        session_id: &str,
308        data: &Value,
309    ) -> Result<(), BrightDataError> {
310        let cache_service = get_bond_cache().await?;
311        cache_service.cache_bond_data(session_id, query, data.clone()).await
312    }
313
314    // ๐Ÿ” ADDED: Get all cached symbols for session (useful for comparisons)
315    pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
316        let cache_service = get_bond_cache().await?;
317        cache_service.get_session_bond_symbols(session_id).await
318    }
319
320    // ๐Ÿ—‘๏ธ ADDED: Clear cache for specific symbol
321    pub async fn clear_symbol_cache(
322        &self,
323        symbol: &str,
324        session_id: &str,
325    ) -> Result<(), BrightDataError> {
326        let cache_service = get_bond_cache().await?;
327        cache_service.clear_bond_symbol_cache(session_id, symbol).await
328    }
329
330    // ๐Ÿ—‘๏ธ ADDED: Clear entire session cache
331    pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
332        let cache_service = get_bond_cache().await?;
333        cache_service.clear_session_bond_cache(session_id).await
334    }
335
336    // ๐Ÿ“Š ADDED: Get cache statistics
337    pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
338        let cache_service = get_bond_cache().await?;
339        cache_service.get_bond_cache_stats().await
340    }
341
342    // ๐Ÿฅ ADDED: Enhanced connectivity test including cache
343    pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
344        let mut results = Vec::new();
345        
346        // Test cache connectivity
347        info!("๐Ÿงช Testing Redis Cache...");
348        match get_bond_cache().await {
349            Ok(cache_service) => {
350                match cache_service.health_check().await {
351                    Ok(_) => results.push("โœ… Redis Cache: SUCCESS".to_string()),
352                    Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
353                }
354            }
355            Err(e) => results.push(format!("โŒ Redis Cache: FAILED - {}", e)),
356        }
357        
358        // Test existing connectivity
359        let api_test = self.test_connectivity().await?;
360        results.push(api_test);
361        
362        Ok(format!("๐Ÿ” Enhanced Connectivity Test Results:\n{}", results.join("\n")))
363    }
364
365    /// ENHANCED: Build URLs separated by method (proxy vs direct)
366    fn build_prioritized_urls_with_priority(
367        &self, 
368        query: &str, 
369        market: &str, 
370        data_type: &str,
371        priority: crate::filters::strategy::QueryPriority
372    ) -> MethodUrls {
373        let mut proxy_urls = Vec::new();
374        let mut direct_urls = Vec::new();
375        let clean_query = query.trim();  // Don't convert to uppercase - preserve case for international symbols
376
377        let max_sources = 3;
378
379        if self.is_likely_bond_symbol(&clean_query) {
380            match market {
381                "usd" => {
382                    let symbols_to_try = if clean_query.contains('.') {
383                        // International symbol like TIGG.L or INGB.AS - use as-is
384                        vec![clean_query.to_string()]
385                    } else if clean_query.starts_with('^') {
386                        // Bond symbols like ^TNX - use as-is
387                        vec![clean_query.to_string()]
388                    } else {
389                        // Regular symbols - try with currency suffix
390                        vec![
391                            format!("{}=X", clean_query.to_uppercase()),
392                            clean_query.to_uppercase(),
393                        ]
394                    };
395                    
396                    for (i, symbol) in symbols_to_try.iter().enumerate() {
397                        if i >= max_sources { break; }
398                        
399                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
400                        let description = format!("Yahoo Finance ({})", symbol);
401
402                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
403                        let proxy_description = format!("Yahoo Finance ({})", symbol);
404                        
405                        proxy_urls.push((proxy_url, proxy_description));
406                        direct_urls.push((url, description));
407                    }
408                },
409                "inr" | _ => {
410                    let symbols_to_try = if clean_query.contains('.') {
411                        // International symbol - use as-is
412                        vec![clean_query.to_string()]
413                    } else {
414                        // Regular symbol - try variations
415                        vec![
416                            clean_query.to_string(),
417                            clean_query.to_uppercase(),
418                        ]
419                    };
420                    
421                    for (i, symbol) in symbols_to_try.iter().enumerate() {
422                        if i >= max_sources { break; }
423                        
424                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
425                        let description = format!("Yahoo Finance ({})", symbol);
426
427                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
428                        let proxy_description = format!("Yahoo Finance ({})", symbol);
429                        
430                        proxy_urls.push((proxy_url, proxy_description));
431                        direct_urls.push((url, description));
432                    }
433                }
434            }
435        }
436
437        // Add search fallback if needed
438        if proxy_urls.len() < max_sources {
439            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
440            let description = "Yahoo Finance Search".to_string();
441
442            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
443            let proxy_description = "Yahoo Finance Search".to_string();
444            
445            proxy_urls.push((proxy_url, proxy_description));
446            direct_urls.push((url, description));
447        }
448
449        info!("๐ŸŽฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
450              proxy_urls.len(), direct_urls.len(), query, priority);
451        
452        MethodUrls {
453            proxy: proxy_urls,
454            direct: direct_urls,
455        }
456    }
457
458    /// ENHANCED: Main fetch function with method-separated URL structure
459    async fn fetch_bond_data_with_fallbacks_and_priority(
460        &self, 
461        query: &str, 
462        market: &str, 
463        data_type: &str,
464        timeframe: &str,
465        include_ratios: bool,
466        include_volume: bool,
467        query_priority: crate::filters::strategy::QueryPriority,
468        token_budget: usize,
469        execution_id: &str
470    ) -> Result<Value, BrightDataError> {
471        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
472        let mut last_error = None;
473        let mut attempts = Vec::new();
474
475        // Define method priority: try direct first, then proxy
476        let methods_to_try = vec![
477            // ("direct", "Direct Call", &method_urls.direct),
478            ("proxy", "Proxy Fallback", &method_urls.proxy)
479        ];
480
481        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
482            info!("๐Ÿ”„ Trying {} method with {} URLs", method_name, urls_for_method.len());
483            
484            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
485                let attempt_result = match *method_type {
486                    "direct" => {
487                        info!("๐ŸŒ Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
488                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
489                        self.try_fetch_url_direct_api(
490                            url, query, market, source_name, query_priority, token_budget, 
491                            execution_id, url_sequence as u64, method_sequence as u64
492                        ).await
493                    }
494                    "proxy" => {
495                        info!("๐Ÿ”„ Trying Proxy method for {} (method: {}, url: {}/{})", 
496                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
497                        self.try_fetch_url_via_proxy(
498                            url, query, market, source_name, query_priority, token_budget, 
499                            execution_id, url_sequence as u64, method_sequence as u64
500                        ).await
501                    }
502                    _ => continue,
503                };
504
505                match attempt_result {
506                    Ok(mut result) => {
507                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
508                        
509                        attempts.push(json!({
510                            "source": source_name,
511                            "url": url,
512                            "method": method_name,
513                            "status": "success",
514                            "content_length": content.len(),
515                            "method_sequence": method_sequence + 1,
516                            "url_sequence": url_sequence + 1
517                        }));
518                        
519                        // TODO: Add content quality check when DEDUCT_DATA=true
520                        let should_try_next = if self.is_data_reduction_enabled() {
521                            // TODO: Add quality-based next source logic
522                            false
523                        } else {
524                            false
525                        };
526                        
527                        if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
528                            if url_sequence < urls_for_method.len() - 1 {
529                                warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
530                                continue; // Try next URL in same method
531                            } else {
532                                warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
533                                break; // Try next method
534                            }
535                        }
536                        
537                        // SUCCESS - but validate data quality first only if DEDUCT_DATA=true
538                        if self.is_data_reduction_enabled() {
539                            // TODO: Add data quality validation when DEDUCT_DATA=true
540                            // For now, accept all content
541                        }
542                        
543                        // SUCCESS
544                        result["source_used"] = json!(source_name);
545                        result["url_used"] = json!(url);
546                        result["method_used"] = json!(method_name);
547                        result["execution_id"] = json!(execution_id);
548                        result["priority"] = json!(format!("{:?}", query_priority));
549                        result["token_budget"] = json!(token_budget);
550                        result["attempts"] = json!(attempts);
551                        result["successful_method_sequence"] = json!(method_sequence + 1);
552                        result["successful_url_sequence"] = json!(url_sequence + 1);
553                        
554                        info!("โœ… Successfully fetched bond data from {} via {} (method: {}, url: {})", 
555                              source_name, method_name, method_sequence + 1, url_sequence + 1);
556                        
557                        return Ok(result);
558                    }
559                    Err(e) => {
560                        attempts.push(json!({
561                            "source": source_name,
562                            "url": url,
563                            "method": method_name,
564                            "status": "failed",
565                            "error": e.to_string(),
566                            "method_sequence": method_sequence + 1,
567                            "url_sequence": url_sequence + 1
568                        }));
569                        
570                        last_error = Some(e);
571                        warn!("โŒ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
572                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
573                    }
574                }
575            }
576        }
577
578        // All methods and sources failed - return empty data instead of error
579        warn!("โŒ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
580        
581        let empty_result = json!({
582            "query": query,
583            "market": market,
584            "status": "no_data_found",
585            "attempts": attempts,
586            "execution_id": execution_id,
587            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
588            "reason": "all_sources_failed"
589        });
590        
591        Ok(empty_result)
592    }
593
594    // Direct BrightData API method (existing implementation)
595    async fn try_fetch_url_direct_api(
596        &self, 
597        url: &str, 
598        query: &str, 
599        market: &str, 
600        source_name: &str, 
601        priority: crate::filters::strategy::QueryPriority,
602        token_budget: usize,
603        execution_id: &str,
604        sequence: u64,
605        method_sequence: u64
606    ) -> Result<Value, BrightDataError> {
607        let max_retries = env::var("MAX_RETRIES")
608            .ok()
609            .and_then(|s| s.parse::<u32>().ok())
610            .unwrap_or(1);
611        
612        let mut last_error = None;
613        
614        for retry_attempt in 0..max_retries {
615            let start_time = Instant::now();
616            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
617            
618            info!("๐ŸŒ Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
619                  source_name, attempt_id, retry_attempt + 1, max_retries);
620            
621            let api_token = env::var("BRIGHTDATA_API_TOKEN")
622                .or_else(|_| env::var("API_TOKEN"))
623                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
624
625            let base_url = env::var("BRIGHTDATA_BASE_URL")
626                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
627
628            let zone = env::var("WEB_UNLOCKER_ZONE")
629                .unwrap_or_else(|_| "mcp_unlocker".to_string());
630
631            let payload = json!({
632                "url": url,
633                "zone": zone,
634                "format": "raw",
635                // "data_format": "markdown"
636            });
637
638            if retry_attempt == 0 {
639                info!("๐Ÿ“ค Direct API Request:");
640                info!("   Endpoint: {}/request", base_url);
641                info!("   Zone: {}", zone);
642                info!("   Target: {}", url);
643            }
644
645            let client = Client::builder()
646                .timeout(Duration::from_secs(90))
647                .build()
648                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
649
650            let response = client
651                .post(&format!("{}/request", base_url))
652                .header("Authorization", format!("Bearer {}", api_token))
653                .header("Content-Type", "application/json")
654                .json(&payload)
655                .send()
656                .await
657                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
658
659            let duration = start_time.elapsed();
660            let status = response.status().as_u16();
661            let response_headers: HashMap<String, String> = response
662                .headers()
663                .iter()
664                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
665                .collect();
666
667            info!("๐Ÿ“ฅ Direct API Response (retry {}):", retry_attempt + 1);
668            info!("   Status: {}", status);
669            info!("   Duration: {}ms", duration.as_millis());
670
671            let response_text = response.text().await
672                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
673
674            // Handle server errors with retry
675            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
676                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
677                warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
678                tokio::time::sleep(wait_time).await;
679                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
680                continue;
681            }
682            
683            if !(200..300).contains(&status) {
684                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
685                                      &response_text[..response_text.len().min(500)]);
686                last_error = Some(BrightDataError::ToolError(error_msg));
687                if retry_attempt == max_retries - 1 {
688                    return Err(last_error.unwrap());
689                }
690                continue;
691            }
692
693            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
694            let raw_content = response_text;
695            let filtered_content = if self.is_data_reduction_enabled() {
696                // TODO: Add content filtering logic when DEDUCT_DATA=true
697                raw_content.clone()
698            } else {
699                raw_content.clone()
700            };
701
702            info!("๐Ÿ“Š Direct API: Content processed: {} bytes -> {} bytes", 
703                  raw_content.len(), filtered_content.len());
704
705            // Log metrics
706            if let Err(e) = BRIGHTDATA_METRICS.log_call(
707                &attempt_id,
708                url,
709                &zone,
710                "raw",
711                None,
712                payload.clone(),
713                status,
714                response_headers.clone(),
715                &raw_content,
716                Some(&filtered_content),
717                duration.as_millis() as u64,
718                None,
719                None,
720            ).await {
721                warn!("Failed to log direct API metrics: {}", e);
722            }
723
724            return Ok(json!({
725                "content": filtered_content,
726                "raw_content": raw_content,
727                "query": query,
728                "market": market,
729                "source": source_name,
730                "method": "Direct BrightData API",
731                "priority": format!("{:?}", priority),
732                "token_budget": token_budget,
733                "execution_id": execution_id,
734                "sequence": sequence,
735                "method_sequence": method_sequence,
736                "success": true,
737                "url": url,
738                "zone": zone,
739                "format": "raw",
740                "status_code": status,
741                "response_size_bytes": raw_content.len(),
742                "filtered_size_bytes": filtered_content.len(),
743                "duration_ms": duration.as_millis(),
744                "timestamp": chrono::Utc::now().to_rfc3339(),
745                "retry_attempts": retry_attempt + 1,
746                "max_retries": max_retries,
747                "payload_used": payload
748            }));
749        }
750
751        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
752    }
753
754    // Proxy-based method
755    async fn try_fetch_url_via_proxy(
756        &self, 
757        url: &str, 
758        query: &str, 
759        market: &str, 
760        source_name: &str, 
761        priority: crate::filters::strategy::QueryPriority,
762        token_budget: usize,
763        execution_id: &str,
764        sequence: u64,
765        method_sequence: u64
766    ) -> Result<Value, BrightDataError> {
767        let max_retries = env::var("MAX_RETRIES")
768            .ok()
769            .and_then(|s| s.parse::<u32>().ok())
770            .unwrap_or(1);
771        
772        let mut last_error = None;
773        
774        // Get proxy configuration from environment
775        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
776            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
777        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
778            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
779        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
780            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
781        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
782            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
783
784        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
785        
786        for retry_attempt in 0..max_retries {
787            let start_time = Instant::now();
788            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
789            
790            info!("๐Ÿ”„ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
791                  source_name, attempt_id, retry_attempt + 1, max_retries);
792            
793            if retry_attempt == 0 {
794                info!("๐Ÿ“ค Proxy Request:");
795                info!("   Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
796                info!("   Target: {}", url);
797            }
798
799            // Create client with proxy configuration
800            let proxy = reqwest::Proxy::all(&proxy_url)
801                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
802
803            let client = Client::builder()
804                .proxy(proxy)
805                .timeout(Duration::from_secs(90))
806                .danger_accept_invalid_certs(true) // Often needed for proxy connections
807                .build()
808                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
809
810            let response = client
811                .get(url)
812                .header("x-unblock-data-format", "markdown")
813                .send()
814                .await
815                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
816
817            let duration = start_time.elapsed();
818            let status = response.status().as_u16();
819            let response_headers: HashMap<String, String> = response
820                .headers()
821                .iter()
822                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
823                .collect();
824
825            info!("๐Ÿ“ฅ Proxy Response (retry {}):", retry_attempt + 1);
826            info!("   Status: {}", status);
827            info!("   Duration: {}ms", duration.as_millis());
828
829            let response_text = response.text().await
830                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
831
832            // Handle server errors with retry
833            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
834                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
835                warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
836                tokio::time::sleep(wait_time).await;
837                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
838                continue;
839            }
840            
841            if !(200..300).contains(&status) {
842                println!("-----------------------------------------------------------------");
843                println!("MARKDOWN SUCCESS: {:?}", status.clone());
844                println!("-----------------------------------------------------------------");
845                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
846                                      &response_text[..response_text.len().min(200)]);
847                
848                warn!("Proxy HTTP error: {}", error_msg);
849                last_error = Some(BrightDataError::ToolError(error_msg));
850                
851                // Log error metrics for proxy
852                let proxy_payload = json!({
853                    "url": url,
854                    "method": "proxy",
855                    "proxy_host": proxy_host,
856                    "proxy_port": proxy_port,
857                    "error": format!("HTTP {}", status)
858                });
859
860                if let Err(e) = BRIGHTDATA_METRICS.log_call(
861                    &attempt_id,
862                    url,
863                    "proxy",
864                    "raw",
865                    None,
866                    proxy_payload,
867                    status,
868                    response_headers.clone(),
869                    &response_text,
870                    Some(&format!("Proxy HTTP {} Error", status)),
871                    duration.as_millis() as u64,
872                    None,
873                    None,
874                ).await {
875                    warn!("Failed to log proxy error metrics: {}", e);
876                }
877                
878                if retry_attempt == max_retries - 1 {
879                    return Err(last_error.unwrap());
880                }
881                continue;
882            }
883
884            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
885            let raw_content = response_text;
886            let filtered_content = if self.is_data_reduction_enabled() {
887                // TODO: Add content filtering logic when DEDUCT_DATA=true
888                raw_content.clone()
889            } else {
890                raw_content.clone()
891            };
892
893            info!("๐Ÿ“Š Proxy: Content processed: {} bytes -> {} bytes", 
894                  raw_content.len(), filtered_content.len());
895
896            // Log metrics (using a simplified payload for proxy requests)
897            let proxy_payload = json!({
898                "url": url,
899                "method": "proxy",
900                "proxy_host": proxy_host,
901                "proxy_port": proxy_port
902            });
903
904            if let Err(e) = BRIGHTDATA_METRICS.log_call(
905                &attempt_id,
906                url,
907                "proxy",
908                "raw",
909                None,
910                proxy_payload.clone(),
911                status,
912                response_headers.clone(),
913                &raw_content,
914                Some(&filtered_content),
915                duration.as_millis() as u64,
916                None,
917                None,
918            ).await {
919                warn!("Failed to log proxy metrics: {}", e);
920            }
921
922            return Ok(json!({
923                "content": filtered_content,
924                "raw_content": raw_content,
925                "query": query,
926                "market": market,
927                "source": source_name,
928                "method": "BrightData Proxy",
929                "priority": format!("{:?}", priority),
930                "token_budget": token_budget,
931                "execution_id": execution_id,
932                "sequence": sequence,
933                "method_sequence": method_sequence,
934                "success": true,
935                "url": url,
936                "proxy_host": proxy_host,
937                "proxy_port": proxy_port,
938                "status_code": status,
939                "response_size_bytes": raw_content.len(),
940                "filtered_size_bytes": filtered_content.len(),
941                "duration_ms": duration.as_millis(),
942                "timestamp": chrono::Utc::now().to_rfc3339(),
943                "retry_attempts": retry_attempt + 1,
944                "max_retries": max_retries,
945                "payload_used": proxy_payload
946            }));
947        }
948
949        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
950    }
951
952    fn is_likely_bond_symbol(&self, query: &str) -> bool {
953        let clean = query.trim();
954        
955        if clean.len() < 1 || clean.len() > 15 {
956            return false;
957        }
958
959        // Allow alphanumeric characters, dots, and common symbol prefixes like ^
960        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.' || c == '^' || c == '=');
961        let has_letters = clean.chars().any(|c| c.is_alphabetic());
962        
963        valid_chars && has_letters
964    }
965
966    // ----------------- Pair helpers -----------------
967    fn normalize_pair(raw: &str) -> String {
968        // Accept "usd/inr", "usd-inr", "USDINR", "USDINR=X" -> return "USDINR" (without =X here)
969        let s = raw.trim().to_uppercase().replace(['/', '-', ' '], "");
970        s.trim().to_string()
971    }
972
973    /// Test both direct API and proxy connectivity
974    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
975        let test_url = "https://finance.yahoo.com/quote/USDINR=X/";
976        let mut results = Vec::new();
977        
978        // Test Direct API
979        info!("Testing Direct BrightData API...");
980        match self.try_fetch_url_direct_api(
981            test_url, "BTC", "usd", "Yahoo Finance Test", 
982            crate::filters::strategy::QueryPriority::High, 1000, 
983            "connectivity_test", 0, 0
984        ).await {
985            Ok(_) => {
986                results.push("Direct API: SUCCESS".to_string());
987            }
988            Err(e) => {
989                results.push(format!("Direct API: FAILED - {}", e));
990            }
991        }
992        
993        // Test Proxy
994        info!("Testing Proxy method...");
995        match self.try_fetch_url_via_proxy(
996            test_url, "BTC", "usd", "Yahoo Finance Test", 
997            crate::filters::strategy::QueryPriority::High, 1000, 
998            "connectivity_test", 0, 1
999        ).await {
1000            Ok(_) => {
1001                results.push("Proxy: SUCCESS".to_string());
1002            }
1003            Err(e) => {
1004                results.push(format!("Proxy: FAILED - {}", e));
1005            }
1006        }
1007        
1008        Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
1009    }
1010}