snm_brightdata_client/tools/
crypto.rs

1// src/tools/crypto.rs - CRYPTO VERSION WITH REDIS CACHE AND DEDUCT_DATA SUPPORT
2use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::crypto_cache::get_crypto_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::crypto_symbol::match_symbol_from_query;
16
17// Struct to organize URLs by method
18#[derive(Debug, Clone)]
19pub struct MethodUrls {
20    pub proxy: Vec<(String, String)>,  // (url, description)
21    pub direct: Vec<(String, String)>, // (url, description)
22}
23
24pub struct CryptoDataTool;
25
26#[async_trait]
27impl Tool for CryptoDataTool {
28    fn name(&self) -> &str {
29        "get_crypto_data"
30    }
31
32    fn description(&self) -> &str {
33        "Get comprehensive cryptocurrency data including prices, market cap, volumes with intelligent filtering and priority-based processing. Supports both direct BrightData API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}-USD/ (e.g., BTC-USD, ETH-USD, SQL-USD)."
34    }
35
36    fn input_schema(&self) -> Value {
37        json!({
38            "type": "object",
39            "properties": {
40                "query": {
41                    "type": "string",
42                    "description": "Cryptocurrency symbol (e.g. BTC, ETH, ADA), coin name, comparison query, or market overview request"
43                },
44                "symbol": {
45                    "type": "string",
46                    "description": "Cryptocurrency symbol (e.g. BTC, ETH, ADA), coin name, comparison query, or market overview request"
47                },
48                "market": {
49                    "type": "string",
50                    "enum": ["usd", "eur", "btc", "global"],
51                    "default": "usd",
52                    "description": "Market region - usd for USD pairs, eur for EUR pairs, btc for BTC pairs, global for all markets"
53                },
54                "data_type": {
55                    "type": "string",
56                    "enum": ["price", "fundamentals", "technical", "news", "all"],
57                    "default": "all",
58                    "description": "Type of crypto data to focus on"
59                },
60                "timeframe": {
61                    "type": "string",
62                    "enum": ["realtime", "day", "week", "month", "quarter", "year"],
63                    "default": "realtime",
64                    "description": "Time period for crypto data analysis"
65                },
66                "include_ratios": {
67                    "type": "boolean",
68                    "default": true,
69                    "description": "Include market ratios like market cap, volume ratios"
70                },
71                "include_volume": {
72                    "type": "boolean",
73                    "default": true,
74                    "description": "Include trading volume and liquidity data"
75                },
76                "session_id": {
77                    "type": "string",
78                    "description": "Session identifier for caching (optional, will use default if not provided)"
79                },
80            },
81            "required": ["query"]
82        })
83    }
84
85    async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
86        self.execute_internal(parameters).await
87    }
88
89    async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
90        let raw_query = parameters
91            .get("symbol")
92            .and_then(|v| v.as_str())
93            .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
94
95        let session_id = parameters
96            .get("user_id")
97            .and_then(|v| v.as_str())
98            .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
99
100        // Step 1: Resolve known symbols (or fallback)
101        let matched_symbol = match_symbol_from_query(raw_query);
102
103        // Step 2: Strip trailing .com / .xyz etc.
104        let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
105
106        let market = parameters
107            .get("market")
108            .and_then(|v| v.as_str())
109            .unwrap_or("usd");
110
111        let data_type = parameters
112            .get("data_type")
113            .and_then(|v| v.as_str())
114            .unwrap_or("all");
115
116        let timeframe = parameters
117            .get("timeframe")
118            .and_then(|v| v.as_str())
119            .unwrap_or("realtime");
120
121        let include_ratios = parameters
122            .get("include_ratios")
123            .and_then(|v| v.as_bool())
124            .unwrap_or(true);
125
126        let include_volume = parameters
127            .get("include_volume")
128            .and_then(|v| v.as_bool())
129            .unwrap_or(true);
130
131        let query_priority = ResponseStrategy::classify_query_priority(query);
132        let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
133
134        let execution_id = format!("crypto_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
135        
136        info!("📈 Crypto query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})", 
137              query, market, query_priority, recommended_tokens, session_id);
138        
139        // 🎯 CACHE CHECK - Check Redis cache first
140        match self.check_cache_first(query, session_id).await {
141            Ok(Some(cached_result)) => {
142                info!("🚀 Cache HIT: Returning cached data for {} in session {}", query, session_id);
143                
144                // Create tool result from cached data
145                let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
146                let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
147                let method_used = "Redis Cache";
148                
149                let formatted_response = self.create_formatted_crypto_response(
150                    query, market, content, source_used, method_used, 
151                    data_type, timeframe, include_ratios, include_volume, &execution_id
152                );
153                
154                let tool_result = ToolResult::success_with_raw(
155                    vec![McpContent::text(formatted_response)], 
156                    cached_result
157                );
158                
159                // Apply filtering only if DEDUCT_DATA=true
160                if self.is_data_reduction_enabled() {
161                    return Ok(ResponseStrategy::apply_size_limits(tool_result));
162                } else {
163                    return Ok(tool_result);
164                }
165            }
166            Ok(None) => {
167                info!("💾 Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
168            }
169            Err(e) => {
170                warn!("🚨 Cache error (continuing with fresh fetch): {}", e);
171            }
172        }
173
174        // 🌐 FRESH FETCH - Cache miss, fetch from sources
175        match self.fetch_crypto_data_with_fallbacks_and_priority(
176            query, market, data_type, timeframe, include_ratios, include_volume,
177            query_priority, recommended_tokens, &execution_id
178        ).await {
179            Ok(result) => {
180                // 🗄️ CACHE STORE - Store successful result in cache
181                if let Err(e) = self.store_in_cache(query, session_id, &result).await {
182                    warn!("Failed to store result in cache: {}", e);
183                }
184                
185                let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
186                let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
187                let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
188                
189                // Create formatted response based on DEDUCT_DATA setting
190                let formatted_response = self.create_formatted_crypto_response(
191                    query, market, content, source_used, method_used, 
192                    data_type, timeframe, include_ratios, include_volume, &execution_id
193                );
194                
195                let tool_result = ToolResult::success_with_raw(
196                    vec![McpContent::text(formatted_response)], 
197                    result
198                );
199                
200                // Apply filtering only if DEDUCT_DATA=true
201                if self.is_data_reduction_enabled() {
202                    Ok(ResponseStrategy::apply_size_limits(tool_result))
203                } else {
204                    Ok(tool_result)
205                }
206            }
207            Err(_e) => {
208                // Return empty data for BrightData errors - Anthropic will retry
209                warn!("BrightData error for query '{}', returning empty data for retry", query);
210                let empty_response = json!({
211                    "query": query,
212                    "market": market,
213                    "status": "no_data",
214                    "reason": "brightdata_error",
215                    "execution_id": execution_id,
216                    "session_id": session_id
217                });
218                
219                Ok(ToolResult::success_with_raw(
220                    vec![McpContent::text("📈 **No Data Available**\n\nPlease try again with a more specific crypto symbol.".to_string())],
221                    empty_response
222                ))
223            }
224        }
225    }
226}
227
228impl CryptoDataTool {
229    /// ENHANCED: Check if data reduction is enabled via DEDUCT_DATA environment variable only
230    fn is_data_reduction_enabled(&self) -> bool {
231        std::env::var("DEDUCT_DATA")
232            .unwrap_or_else(|_| "false".to_string())
233            .to_lowercase() == "true"
234    }
235
236    /// ENHANCED: Create formatted response with DEDUCT_DATA control
237    fn create_formatted_crypto_response(
238        &self,
239        query: &str,
240        market: &str, 
241        content: &str,
242        source: &str,
243        method: &str,
244        data_type: &str,
245        timeframe: &str,
246        include_ratios: bool,
247        include_volume: bool,
248        execution_id: &str
249    ) -> String {
250        // If DEDUCT_DATA=false, return full content with basic formatting
251        if !self.is_data_reduction_enabled() {
252            return format!(
253                "📈 **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
254                query.to_uppercase(), 
255                market.to_uppercase(), 
256                content,
257                source, 
258                method, 
259                data_type, 
260                timeframe
261            );
262        }
263
264        // TODO: Add filtered data extraction logic when DEDUCT_DATA=true
265        // For now, return full content formatted
266        format!(
267            "📈 **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
268            query.to_uppercase(), 
269            market.to_uppercase(), 
270            content,
271            source, 
272            method, 
273            data_type, 
274            timeframe
275        )
276    }
277
278    // 🎯 ADDED: Check Redis cache first
279    async fn check_cache_first(
280        &self,
281        query: &str,
282        session_id: &str,
283    ) -> Result<Option<Value>, BrightDataError> {
284        let cache_service = get_crypto_cache().await?;
285        cache_service.get_cached_crypto_data(session_id, query).await
286    }
287
288    // 🗄️ ADDED: Store successful result in Redis cache
289    async fn store_in_cache(
290        &self,
291        query: &str,
292        session_id: &str,
293        data: &Value,
294    ) -> Result<(), BrightDataError> {
295        let cache_service = get_crypto_cache().await?;
296        cache_service.cache_crypto_data(session_id, query, data.clone()).await
297    }
298
299    /// ENHANCED: Build URLs separated by method (proxy vs direct)
300    fn build_prioritized_urls_with_priority(
301        &self, 
302        query: &str, 
303        market: &str, 
304        data_type: &str,
305        priority: crate::filters::strategy::QueryPriority
306    ) -> MethodUrls {
307        let mut proxy_urls = Vec::new();
308        let mut direct_urls = Vec::new();
309        let clean_query = query.trim().to_uppercase();
310
311        // Add priority-based URL limiting
312        let max_sources = 3;
313
314        if self.is_likely_crypto_symbol(&clean_query) {
315            match market {
316                "usd" => {
317                    // TODO: Add priority-based URL selection logic
318                    let symbols_to_try = vec![
319                        format!("{}-USD", clean_query),
320                        clean_query.clone(),
321                    ];
322                    
323                    for (i, symbol) in symbols_to_try.iter().enumerate() {
324                        if i >= max_sources { break; }
325                        
326                        let url = format!("https://finance.yahoo.com/quote/{}", symbol);
327                        let description = format!("Yahoo Finance ({})", symbol);
328
329                        let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
330                        let proxy_description = format!("Yahoo Finance ({})", symbol);
331                        
332                        // Add to both proxy and direct (same URLs, different methods)
333                        proxy_urls.push((proxy_url, proxy_description));
334                        direct_urls.push((url, description));
335                    }
336                }
337                "eur" => {
338                    let url = format!("https://finance.yahoo.com/quote/{}-EUR/", clean_query);
339                    let description = format!("Yahoo Finance ({})", clean_query);
340
341                    let proxy_url = format!("https://finance.yahoo.com/quote/{}-EUR/", clean_query);
342                    let proxy_description = format!("Yahoo Finance ({})", clean_query);
343                    
344                    proxy_urls.push((proxy_url, proxy_description));
345                    direct_urls.push((url, description));
346                }
347                "btc" => {
348                    let url = format!("https://finance.yahoo.com/quote/{}-BTC/", clean_query);
349                    let description = format!("Yahoo Finance ({})", clean_query);
350
351                    let proxy_url = format!("https://finance.yahoo.com/quote/{}-BTC/", clean_query);
352                    let proxy_description = format!("Yahoo Finance ({})", clean_query);
353                    
354                    proxy_urls.push((proxy_url, proxy_description));
355                    direct_urls.push((url, description));
356                }
357                "global" => {
358                    let url = format!("https://finance.yahoo.com/quote/{}-USD/", clean_query);
359                    let description = format!("Yahoo Finance Global ({})", clean_query);
360
361                    let proxy_url = format!("https://finance.yahoo.com/quote/{}-USD/", clean_query);
362                    let proxy_description = format!("Yahoo Finance Global ({})", clean_query);
363                    
364                    proxy_urls.push((proxy_url, proxy_description));
365                    direct_urls.push((url, description));
366                }
367                _ => {}
368            }
369        }
370
371        // Add search fallbacks (no restrictions when DEDUCT_DATA=false)
372        if proxy_urls.len() < max_sources {
373            let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
374            let description = "Yahoo Finance Search".to_string();
375
376            let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
377            let proxy_description = "Yahoo Finance Search".to_string();
378            
379            proxy_urls.push((proxy_url, proxy_description));
380            direct_urls.push((url, description));
381        }
382
383        info!("🎯 Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})", 
384              proxy_urls.len(), direct_urls.len(), query, priority);
385        
386        MethodUrls {
387            proxy: proxy_urls,
388            direct: direct_urls,
389        }
390    }
391
392    /// ENHANCED: Main fetch function with method-separated URL structure
393    async fn fetch_crypto_data_with_fallbacks_and_priority(
394        &self, 
395        query: &str, 
396        market: &str, 
397        data_type: &str,
398        timeframe: &str,
399        include_ratios: bool,
400        include_volume: bool,
401        query_priority: crate::filters::strategy::QueryPriority,
402        token_budget: usize,
403        execution_id: &str
404    ) -> Result<Value, BrightDataError> {
405        let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
406        let mut last_error = None;
407        let mut attempts = Vec::new();
408
409        // Define method priority: try direct first, then proxy
410        let methods_to_try = vec![
411            // ("direct", "Direct Call", &method_urls.direct),
412            ("proxy", "Proxy Fallback", &method_urls.proxy)
413        ];
414
415        for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
416            info!("🔄 Trying {} method with {} URLs", method_name, urls_for_method.len());
417            
418            for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
419                let attempt_result = match *method_type {
420                    "direct" => {
421                        info!("🌐 Trying Direct BrightData API for {} (method: {}, url: {}/{})", 
422                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
423                        self.try_fetch_url_direct_api(
424                            url, query, market, source_name, query_priority, token_budget, 
425                            execution_id, url_sequence as u64, method_sequence as u64
426                        ).await
427                    }
428                    "proxy" => {
429                        info!("🔄 Trying Proxy method for {} (method: {}, url: {}/{})", 
430                              source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
431                        self.try_fetch_url_via_proxy(
432                            url, query, market, source_name, query_priority, token_budget, 
433                            execution_id, url_sequence as u64, method_sequence as u64
434                        ).await
435                    }
436                    _ => continue,
437                };
438
439                match attempt_result {
440                    Ok(mut result) => {
441                        let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
442                        
443                        attempts.push(json!({
444                            "source": source_name,
445                            "url": url,
446                            "method": method_name,
447                            "status": "success",
448                            "content_length": content.len(),
449                            "method_sequence": method_sequence + 1,
450                            "url_sequence": url_sequence + 1
451                        }));
452                        
453                        // SUCCESS
454                        result["source_used"] = json!(source_name);
455                        result["url_used"] = json!(url);
456                        result["method_used"] = json!(method_name);
457                        result["execution_id"] = json!(execution_id);
458                        result["priority"] = json!(format!("{:?}", query_priority));
459                        result["token_budget"] = json!(token_budget);
460                        result["attempts"] = json!(attempts);
461                        result["successful_method_sequence"] = json!(method_sequence + 1);
462                        result["successful_url_sequence"] = json!(url_sequence + 1);
463                        
464                        info!("✅ Successfully fetched crypto data from {} via {} (method: {}, url: {})", 
465                              source_name, method_name, method_sequence + 1, url_sequence + 1);
466                        
467                        return Ok(result);
468                    }
469                    Err(e) => {
470                        attempts.push(json!({
471                            "source": source_name,
472                            "url": url,
473                            "method": method_name,
474                            "status": "failed",
475                            "error": e.to_string(),
476                            "method_sequence": method_sequence + 1,
477                            "url_sequence": url_sequence + 1
478                        }));
479                        
480                        last_error = Some(e);
481                        warn!("❌ Failed to fetch from {} via {} (method: {}, url: {}): {:?}", 
482                              source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
483                    }
484                }
485            }
486        }
487
488        // All methods and sources failed - return empty data instead of error
489        warn!("❌ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
490        
491        let empty_result = json!({
492            "query": query,
493            "market": market,
494            "status": "no_data_found",
495            "attempts": attempts,
496            "execution_id": execution_id,
497            "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
498            "reason": "all_sources_failed"
499        });
500        
501        Ok(empty_result)
502    }
503
504    // Direct BrightData API method (existing implementation)
505    async fn try_fetch_url_direct_api(
506        &self, 
507        url: &str, 
508        query: &str, 
509        market: &str, 
510        source_name: &str, 
511        priority: crate::filters::strategy::QueryPriority,
512        token_budget: usize,
513        execution_id: &str,
514        sequence: u64,
515        method_sequence: u64
516    ) -> Result<Value, BrightDataError> {
517        let max_retries = env::var("MAX_RETRIES")
518            .ok()
519            .and_then(|s| s.parse::<u32>().ok())
520            .unwrap_or(1);
521        
522        let mut last_error = None;
523        
524        for retry_attempt in 0..max_retries {
525            let start_time = Instant::now();
526            let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
527            
528            info!("🌐 Direct API: Fetching from {} (execution: {}, retry: {}/{})", 
529                  source_name, attempt_id, retry_attempt + 1, max_retries);
530            
531            let api_token = env::var("BRIGHTDATA_API_TOKEN")
532                .or_else(|_| env::var("API_TOKEN"))
533                .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
534
535            let base_url = env::var("BRIGHTDATA_BASE_URL")
536                .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
537
538            let zone = env::var("WEB_UNLOCKER_ZONE")
539                .unwrap_or_else(|_| "mcp_unlocker".to_string());
540
541            let payload = json!({
542                "url": url,
543                "zone": zone,
544                "format": "raw",
545            });
546
547            let client = Client::builder()
548                .timeout(Duration::from_secs(90))
549                .build()
550                .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
551
552            let response = client
553                .post(&format!("{}/request", base_url))
554                .header("Authorization", format!("Bearer {}", api_token))
555                .header("Content-Type", "application/json")
556                .json(&payload)
557                .send()
558                .await
559                .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
560
561            let duration = start_time.elapsed();
562            let status = response.status().as_u16();
563            let response_headers: HashMap<String, String> = response
564                .headers()
565                .iter()
566                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
567                .collect();
568
569            let response_text = response.text().await
570                .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
571
572            // Handle server errors with retry
573            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
574                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
575                warn!("⏳ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
576                tokio::time::sleep(wait_time).await;
577                last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
578                continue;
579            }
580            
581            if !(200..300).contains(&status) {
582                let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status, 
583                                      &response_text[..response_text.len().min(500)]);
584                last_error = Some(BrightDataError::ToolError(error_msg));
585                if retry_attempt == max_retries - 1 {
586                    return Err(last_error.unwrap());
587                }
588                continue;
589            }
590
591            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
592            let raw_content = response_text;
593            let filtered_content = if self.is_data_reduction_enabled() {
594                // TODO: Add content filtering logic when DEDUCT_DATA=true
595                raw_content.clone()
596            } else {
597                raw_content.clone()
598            };
599
600            // Log metrics
601            if let Err(e) = BRIGHTDATA_METRICS.log_call(
602                &attempt_id,
603                url,
604                &zone,
605                "raw",
606                None,
607                payload.clone(),
608                status,
609                response_headers.clone(),
610                &raw_content,
611                Some(&filtered_content),
612                duration.as_millis() as u64,
613                None,
614                None,
615            ).await {
616                warn!("Failed to log direct API metrics: {}", e);
617            }
618
619            return Ok(json!({
620                "content": filtered_content,
621                "raw_content": raw_content,
622                "query": query,
623                "market": market,
624                "source": source_name,
625                "method": "Direct BrightData API",
626                "priority": format!("{:?}", priority),
627                "token_budget": token_budget,
628                "execution_id": execution_id,
629                "sequence": sequence,
630                "method_sequence": method_sequence,
631                "success": true,
632                "url": url,
633                "zone": zone,
634                "format": "raw",
635                "status_code": status,
636                "response_size_bytes": raw_content.len(),
637                "filtered_size_bytes": filtered_content.len(),
638                "duration_ms": duration.as_millis(),
639                "timestamp": chrono::Utc::now().to_rfc3339(),
640                "retry_attempts": retry_attempt + 1,
641                "max_retries": max_retries,
642                "payload_used": payload
643            }));
644        }
645
646        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
647    }
648
649    // Proxy-based method
650    async fn try_fetch_url_via_proxy(
651        &self, 
652        url: &str, 
653        query: &str, 
654        market: &str, 
655        source_name: &str, 
656        priority: crate::filters::strategy::QueryPriority,
657        token_budget: usize,
658        execution_id: &str,
659        sequence: u64,
660        method_sequence: u64
661    ) -> Result<Value, BrightDataError> {
662        let max_retries = env::var("MAX_RETRIES")
663            .ok()
664            .and_then(|s| s.parse::<u32>().ok())
665            .unwrap_or(1);
666        
667        let mut last_error = None;
668        
669        // Get proxy configuration from environment
670        let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
671            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
672        let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
673            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
674        let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
675            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
676        let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
677            .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
678
679        let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
680        
681        for retry_attempt in 0..max_retries {
682            let start_time = Instant::now();
683            let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
684            
685            info!("🔄 Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})", 
686                  source_name, attempt_id, retry_attempt + 1, max_retries);
687            
688            // Create client with proxy configuration
689            let proxy = reqwest::Proxy::all(&proxy_url)
690                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
691
692            let client = Client::builder()
693                .proxy(proxy)
694                .timeout(Duration::from_secs(90))
695                .danger_accept_invalid_certs(true) // Often needed for proxy connections
696                .build()
697                .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
698
699            let response = client
700                .get(url)
701                .header("x-unblock-data-format", "markdown")
702                .send()
703                .await
704                .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
705
706            let duration = start_time.elapsed();
707            let status = response.status().as_u16();
708            let response_headers: HashMap<String, String> = response
709                .headers()
710                .iter()
711                .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
712                .collect();
713
714            let response_text = response.text().await
715                .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
716
717            // Handle server errors with retry
718            if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
719                let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
720                warn!("Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
721                tokio::time::sleep(wait_time).await;
722                last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
723                continue;
724            }
725            
726            if !(200..300).contains(&status) {
727                let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status, 
728                                      &response_text[..response_text.len().min(200)]);
729                
730                warn!("Proxy HTTP error: {}", error_msg);
731                last_error = Some(BrightDataError::ToolError(error_msg));
732                
733                // Log error metrics for proxy
734                let proxy_payload = json!({
735                    "url": url,
736                    "method": "proxy",
737                    "proxy_host": proxy_host,
738                    "proxy_port": proxy_port,
739                    "error": format!("HTTP {}", status)
740                });
741
742                if let Err(e) = BRIGHTDATA_METRICS.log_call(
743                    &attempt_id,
744                    url,
745                    "proxy",
746                    "raw",
747                    None,
748                    proxy_payload,
749                    status,
750                    response_headers.clone(),
751                    &response_text,
752                    Some(&format!("Proxy HTTP {} Error", status)),
753                    duration.as_millis() as u64,
754                    None,
755                    None,
756                ).await {
757                    warn!("Failed to log proxy error metrics: {}", e);
758                }
759                
760                if retry_attempt == max_retries - 1 {
761                    return Err(last_error.unwrap());
762                }
763                continue;
764            }
765
766            // SUCCESS - Process response (apply filtering only if DEDUCT_DATA=true)
767            let raw_content = response_text;
768            let filtered_content = if self.is_data_reduction_enabled() {
769                // TODO: Add content filtering logic when DEDUCT_DATA=true
770                raw_content.clone()
771            } else {
772                raw_content.clone()
773            };
774
775            // Log metrics (using a simplified payload for proxy requests)
776            let proxy_payload = json!({
777                "url": url,
778                "method": "proxy",
779                "proxy_host": proxy_host,
780                "proxy_port": proxy_port
781            });
782
783            if let Err(e) = BRIGHTDATA_METRICS.log_call(
784                &attempt_id,
785                url,
786                "proxy",
787                "raw",
788                None,
789                proxy_payload.clone(),
790                status,
791                response_headers.clone(),
792                &raw_content,
793                Some(&filtered_content),
794                duration.as_millis() as u64,
795                None,
796                None,
797            ).await {
798                warn!("Failed to log proxy metrics: {}", e);
799            }
800
801            return Ok(json!({
802                "content": filtered_content,
803                "raw_content": raw_content,
804                "query": query,
805                "market": market,
806                "source": source_name,
807                "method": "BrightData Proxy",
808                "priority": format!("{:?}", priority),
809                "token_budget": token_budget,
810                "execution_id": execution_id,
811                "sequence": sequence,
812                "method_sequence": method_sequence,
813                "success": true,
814                "url": url,
815                "proxy_host": proxy_host,
816                "proxy_port": proxy_port,
817                "status_code": status,
818                "response_size_bytes": raw_content.len(),
819                "filtered_size_bytes": filtered_content.len(),
820                "duration_ms": duration.as_millis(),
821                "timestamp": chrono::Utc::now().to_rfc3339(),
822                "retry_attempts": retry_attempt + 1,
823                "max_retries": max_retries,
824                "payload_used": proxy_payload
825            }));
826        }
827
828        Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
829    }
830
831    fn is_likely_crypto_symbol(&self, query: &str) -> bool {
832        let clean = query.trim();
833        
834        if clean.len() < 1 || clean.len() > 15 {
835            return false;
836        }
837
838        let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
839        let has_letters = clean.chars().any(|c| c.is_alphabetic());
840        
841        valid_chars && has_letters
842    }
843
844    /// Test both direct API and proxy connectivity
845    pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
846        let test_url = "https://finance.yahoo.com/quote/BTC-USD/";
847        let mut results = Vec::new();
848        
849        // Test Direct API
850        info!("Testing Direct BrightData API...");
851        match self.try_fetch_url_direct_api(
852            test_url, "BTC", "usd", "Yahoo Finance Test", 
853            crate::filters::strategy::QueryPriority::High, 1000, 
854            "connectivity_test", 0, 0
855        ).await {
856            Ok(_) => {
857                results.push("Direct API: SUCCESS".to_string());
858            }
859            Err(e) => {
860                results.push(format!("Direct API: FAILED - {}", e));
861            }
862        }
863        
864        // Test Proxy
865        info!("Testing Proxy method...");
866        match self.try_fetch_url_via_proxy(
867            test_url, "BTC", "usd", "Yahoo Finance Test", 
868            crate::filters::strategy::QueryPriority::High, 1000, 
869            "connectivity_test", 0, 1
870        ).await {
871            Ok(_) => {
872                results.push("Proxy: SUCCESS".to_string());
873            }
874            Err(e) => {
875                results.push(format!("Proxy: FAILED - {}", e));
876            }
877        }
878        
879        Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
880    }
881}
882
883