1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::crypto_cache::get_crypto_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::crypto_symbol::match_symbol_from_query;
16
17#[derive(Debug, Clone)]
19pub struct MethodUrls {
20 pub proxy: Vec<(String, String)>, pub direct: Vec<(String, String)>, }
23
24pub struct CryptoDataTool;
25
26#[async_trait]
27impl Tool for CryptoDataTool {
28 fn name(&self) -> &str {
29 "get_crypto_data"
30 }
31
32 fn description(&self) -> &str {
33 "Get comprehensive cryptocurrency data including prices, market cap, volumes with intelligent filtering and priority-based processing. Supports both direct BrightData API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}-USD/ (e.g., BTC-USD, ETH-USD, SQL-USD)."
34 }
35
36 fn input_schema(&self) -> Value {
37 json!({
38 "type": "object",
39 "properties": {
40 "query": {
41 "type": "string",
42 "description": "Cryptocurrency symbol (e.g. BTC, ETH, ADA), coin name, comparison query, or market overview request"
43 },
44 "symbol": {
45 "type": "string",
46 "description": "Cryptocurrency symbol (e.g. BTC, ETH, ADA), coin name, comparison query, or market overview request"
47 },
48 "market": {
49 "type": "string",
50 "enum": ["usd", "eur", "btc", "global"],
51 "default": "usd",
52 "description": "Market region - usd for USD pairs, eur for EUR pairs, btc for BTC pairs, global for all markets"
53 },
54 "data_type": {
55 "type": "string",
56 "enum": ["price", "fundamentals", "technical", "news", "all"],
57 "default": "all",
58 "description": "Type of crypto data to focus on"
59 },
60 "timeframe": {
61 "type": "string",
62 "enum": ["realtime", "day", "week", "month", "quarter", "year"],
63 "default": "realtime",
64 "description": "Time period for crypto data analysis"
65 },
66 "include_ratios": {
67 "type": "boolean",
68 "default": true,
69 "description": "Include market ratios like market cap, volume ratios"
70 },
71 "include_volume": {
72 "type": "boolean",
73 "default": true,
74 "description": "Include trading volume and liquidity data"
75 },
76 "session_id": {
77 "type": "string",
78 "description": "Session identifier for caching (optional, will use default if not provided)"
79 },
80 },
81 "required": ["query"]
82 })
83 }
84
85 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
86 self.execute_internal(parameters).await
87 }
88
89 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
90 let raw_query = parameters
91 .get("symbol")
92 .and_then(|v| v.as_str())
93 .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
94
95 let session_id = parameters
96 .get("user_id")
97 .and_then(|v| v.as_str())
98 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
99
100 let matched_symbol = match_symbol_from_query(raw_query);
102
103 let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
105
106 let market = parameters
107 .get("market")
108 .and_then(|v| v.as_str())
109 .unwrap_or("usd");
110
111 let data_type = parameters
112 .get("data_type")
113 .and_then(|v| v.as_str())
114 .unwrap_or("all");
115
116 let timeframe = parameters
117 .get("timeframe")
118 .and_then(|v| v.as_str())
119 .unwrap_or("realtime");
120
121 let include_ratios = parameters
122 .get("include_ratios")
123 .and_then(|v| v.as_bool())
124 .unwrap_or(true);
125
126 let include_volume = parameters
127 .get("include_volume")
128 .and_then(|v| v.as_bool())
129 .unwrap_or(true);
130
131 let query_priority = ResponseStrategy::classify_query_priority(query);
132 let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
133
134 let execution_id = format!("crypto_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
135
136 info!("📈 Crypto query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})",
137 query, market, query_priority, recommended_tokens, session_id);
138
139 match self.check_cache_first(query, session_id).await {
141 Ok(Some(cached_result)) => {
142 info!("🚀 Cache HIT: Returning cached data for {} in session {}", query, session_id);
143
144 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
146 let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
147 let method_used = "Redis Cache";
148
149 let formatted_response = self.create_formatted_crypto_response(
150 query, market, content, source_used, method_used,
151 data_type, timeframe, include_ratios, include_volume, &execution_id
152 );
153
154 let tool_result = ToolResult::success_with_raw(
155 vec![McpContent::text(formatted_response)],
156 cached_result
157 );
158
159 if self.is_data_reduction_enabled() {
161 return Ok(ResponseStrategy::apply_size_limits(tool_result));
162 } else {
163 return Ok(tool_result);
164 }
165 }
166 Ok(None) => {
167 info!("💾 Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
168 }
169 Err(e) => {
170 warn!("🚨 Cache error (continuing with fresh fetch): {}", e);
171 }
172 }
173
174 match self.fetch_crypto_data_with_fallbacks_and_priority(
176 query, market, data_type, timeframe, include_ratios, include_volume,
177 query_priority, recommended_tokens, &execution_id
178 ).await {
179 Ok(result) => {
180 if let Err(e) = self.store_in_cache(query, session_id, &result).await {
182 warn!("Failed to store result in cache: {}", e);
183 }
184
185 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
186 let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
187 let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
188
189 let formatted_response = self.create_formatted_crypto_response(
191 query, market, content, source_used, method_used,
192 data_type, timeframe, include_ratios, include_volume, &execution_id
193 );
194
195 let tool_result = ToolResult::success_with_raw(
196 vec![McpContent::text(formatted_response)],
197 result
198 );
199
200 if self.is_data_reduction_enabled() {
202 Ok(ResponseStrategy::apply_size_limits(tool_result))
203 } else {
204 Ok(tool_result)
205 }
206 }
207 Err(_e) => {
208 warn!("BrightData error for query '{}', returning empty data for retry", query);
210 let empty_response = json!({
211 "query": query,
212 "market": market,
213 "status": "no_data",
214 "reason": "brightdata_error",
215 "execution_id": execution_id,
216 "session_id": session_id
217 });
218
219 Ok(ToolResult::success_with_raw(
220 vec![McpContent::text("📈 **No Data Available**\n\nPlease try again with a more specific crypto symbol.".to_string())],
221 empty_response
222 ))
223 }
224 }
225 }
226}
227
228impl CryptoDataTool {
229 fn is_data_reduction_enabled(&self) -> bool {
231 std::env::var("DEDUCT_DATA")
232 .unwrap_or_else(|_| "false".to_string())
233 .to_lowercase() == "true"
234 }
235
236 fn create_formatted_crypto_response(
238 &self,
239 query: &str,
240 market: &str,
241 content: &str,
242 source: &str,
243 method: &str,
244 data_type: &str,
245 timeframe: &str,
246 include_ratios: bool,
247 include_volume: bool,
248 execution_id: &str
249 ) -> String {
250 if !self.is_data_reduction_enabled() {
252 return format!(
253 "📈 **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
254 query.to_uppercase(),
255 market.to_uppercase(),
256 content,
257 source,
258 method,
259 data_type,
260 timeframe
261 );
262 }
263
264 format!(
267 "📈 **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
268 query.to_uppercase(),
269 market.to_uppercase(),
270 content,
271 source,
272 method,
273 data_type,
274 timeframe
275 )
276 }
277
278 async fn check_cache_first(
280 &self,
281 query: &str,
282 session_id: &str,
283 ) -> Result<Option<Value>, BrightDataError> {
284 let cache_service = get_crypto_cache().await?;
285 cache_service.get_cached_crypto_data(session_id, query).await
286 }
287
288 async fn store_in_cache(
290 &self,
291 query: &str,
292 session_id: &str,
293 data: &Value,
294 ) -> Result<(), BrightDataError> {
295 let cache_service = get_crypto_cache().await?;
296 cache_service.cache_crypto_data(session_id, query, data.clone()).await
297 }
298
299 fn build_prioritized_urls_with_priority(
301 &self,
302 query: &str,
303 market: &str,
304 data_type: &str,
305 priority: crate::filters::strategy::QueryPriority
306 ) -> MethodUrls {
307 let mut proxy_urls = Vec::new();
308 let mut direct_urls = Vec::new();
309 let clean_query = query.trim().to_uppercase();
310
311 let max_sources = 3;
313
314 if self.is_likely_crypto_symbol(&clean_query) {
315 match market {
316 "usd" => {
317 let symbols_to_try = vec![
319 format!("{}-USD", clean_query),
320 clean_query.clone(),
321 ];
322
323 for (i, symbol) in symbols_to_try.iter().enumerate() {
324 if i >= max_sources { break; }
325
326 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
327 let description = format!("Yahoo Finance ({})", symbol);
328
329 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
330 let proxy_description = format!("Yahoo Finance ({})", symbol);
331
332 proxy_urls.push((proxy_url, proxy_description));
334 direct_urls.push((url, description));
335 }
336 }
337 "eur" => {
338 let url = format!("https://finance.yahoo.com/quote/{}-EUR/", clean_query);
339 let description = format!("Yahoo Finance ({})", clean_query);
340
341 let proxy_url = format!("https://finance.yahoo.com/quote/{}-EUR/", clean_query);
342 let proxy_description = format!("Yahoo Finance ({})", clean_query);
343
344 proxy_urls.push((proxy_url, proxy_description));
345 direct_urls.push((url, description));
346 }
347 "btc" => {
348 let url = format!("https://finance.yahoo.com/quote/{}-BTC/", clean_query);
349 let description = format!("Yahoo Finance ({})", clean_query);
350
351 let proxy_url = format!("https://finance.yahoo.com/quote/{}-BTC/", clean_query);
352 let proxy_description = format!("Yahoo Finance ({})", clean_query);
353
354 proxy_urls.push((proxy_url, proxy_description));
355 direct_urls.push((url, description));
356 }
357 "global" => {
358 let url = format!("https://finance.yahoo.com/quote/{}-USD/", clean_query);
359 let description = format!("Yahoo Finance Global ({})", clean_query);
360
361 let proxy_url = format!("https://finance.yahoo.com/quote/{}-USD/", clean_query);
362 let proxy_description = format!("Yahoo Finance Global ({})", clean_query);
363
364 proxy_urls.push((proxy_url, proxy_description));
365 direct_urls.push((url, description));
366 }
367 _ => {}
368 }
369 }
370
371 if proxy_urls.len() < max_sources {
373 let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
374 let description = "Yahoo Finance Search".to_string();
375
376 let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
377 let proxy_description = "Yahoo Finance Search".to_string();
378
379 proxy_urls.push((proxy_url, proxy_description));
380 direct_urls.push((url, description));
381 }
382
383 info!("🎯 Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})",
384 proxy_urls.len(), direct_urls.len(), query, priority);
385
386 MethodUrls {
387 proxy: proxy_urls,
388 direct: direct_urls,
389 }
390 }
391
392 async fn fetch_crypto_data_with_fallbacks_and_priority(
394 &self,
395 query: &str,
396 market: &str,
397 data_type: &str,
398 timeframe: &str,
399 include_ratios: bool,
400 include_volume: bool,
401 query_priority: crate::filters::strategy::QueryPriority,
402 token_budget: usize,
403 execution_id: &str
404 ) -> Result<Value, BrightDataError> {
405 let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
406 let mut last_error = None;
407 let mut attempts = Vec::new();
408
409 let methods_to_try = vec![
411 ("proxy", "Proxy Fallback", &method_urls.proxy)
413 ];
414
415 for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
416 info!("🔄 Trying {} method with {} URLs", method_name, urls_for_method.len());
417
418 for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
419 let attempt_result = match *method_type {
420 "direct" => {
421 info!("🌐 Trying Direct BrightData API for {} (method: {}, url: {}/{})",
422 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
423 self.try_fetch_url_direct_api(
424 url, query, market, source_name, query_priority, token_budget,
425 execution_id, url_sequence as u64, method_sequence as u64
426 ).await
427 }
428 "proxy" => {
429 info!("🔄 Trying Proxy method for {} (method: {}, url: {}/{})",
430 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
431 self.try_fetch_url_via_proxy(
432 url, query, market, source_name, query_priority, token_budget,
433 execution_id, url_sequence as u64, method_sequence as u64
434 ).await
435 }
436 _ => continue,
437 };
438
439 match attempt_result {
440 Ok(mut result) => {
441 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
442
443 attempts.push(json!({
444 "source": source_name,
445 "url": url,
446 "method": method_name,
447 "status": "success",
448 "content_length": content.len(),
449 "method_sequence": method_sequence + 1,
450 "url_sequence": url_sequence + 1
451 }));
452
453 result["source_used"] = json!(source_name);
455 result["url_used"] = json!(url);
456 result["method_used"] = json!(method_name);
457 result["execution_id"] = json!(execution_id);
458 result["priority"] = json!(format!("{:?}", query_priority));
459 result["token_budget"] = json!(token_budget);
460 result["attempts"] = json!(attempts);
461 result["successful_method_sequence"] = json!(method_sequence + 1);
462 result["successful_url_sequence"] = json!(url_sequence + 1);
463
464 info!("✅ Successfully fetched crypto data from {} via {} (method: {}, url: {})",
465 source_name, method_name, method_sequence + 1, url_sequence + 1);
466
467 return Ok(result);
468 }
469 Err(e) => {
470 attempts.push(json!({
471 "source": source_name,
472 "url": url,
473 "method": method_name,
474 "status": "failed",
475 "error": e.to_string(),
476 "method_sequence": method_sequence + 1,
477 "url_sequence": url_sequence + 1
478 }));
479
480 last_error = Some(e);
481 warn!("❌ Failed to fetch from {} via {} (method: {}, url: {}): {:?}",
482 source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
483 }
484 }
485 }
486 }
487
488 warn!("❌ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
490
491 let empty_result = json!({
492 "query": query,
493 "market": market,
494 "status": "no_data_found",
495 "attempts": attempts,
496 "execution_id": execution_id,
497 "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
498 "reason": "all_sources_failed"
499 });
500
501 Ok(empty_result)
502 }
503
504 async fn try_fetch_url_direct_api(
506 &self,
507 url: &str,
508 query: &str,
509 market: &str,
510 source_name: &str,
511 priority: crate::filters::strategy::QueryPriority,
512 token_budget: usize,
513 execution_id: &str,
514 sequence: u64,
515 method_sequence: u64
516 ) -> Result<Value, BrightDataError> {
517 let max_retries = env::var("MAX_RETRIES")
518 .ok()
519 .and_then(|s| s.parse::<u32>().ok())
520 .unwrap_or(1);
521
522 let mut last_error = None;
523
524 for retry_attempt in 0..max_retries {
525 let start_time = Instant::now();
526 let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
527
528 info!("🌐 Direct API: Fetching from {} (execution: {}, retry: {}/{})",
529 source_name, attempt_id, retry_attempt + 1, max_retries);
530
531 let api_token = env::var("BRIGHTDATA_API_TOKEN")
532 .or_else(|_| env::var("API_TOKEN"))
533 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
534
535 let base_url = env::var("BRIGHTDATA_BASE_URL")
536 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
537
538 let zone = env::var("WEB_UNLOCKER_ZONE")
539 .unwrap_or_else(|_| "mcp_unlocker".to_string());
540
541 let payload = json!({
542 "url": url,
543 "zone": zone,
544 "format": "raw",
545 });
546
547 let client = Client::builder()
548 .timeout(Duration::from_secs(90))
549 .build()
550 .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
551
552 let response = client
553 .post(&format!("{}/request", base_url))
554 .header("Authorization", format!("Bearer {}", api_token))
555 .header("Content-Type", "application/json")
556 .json(&payload)
557 .send()
558 .await
559 .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
560
561 let duration = start_time.elapsed();
562 let status = response.status().as_u16();
563 let response_headers: HashMap<String, String> = response
564 .headers()
565 .iter()
566 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
567 .collect();
568
569 let response_text = response.text().await
570 .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
571
572 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
574 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
575 warn!("⏳ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
576 tokio::time::sleep(wait_time).await;
577 last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
578 continue;
579 }
580
581 if !(200..300).contains(&status) {
582 let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status,
583 &response_text[..response_text.len().min(500)]);
584 last_error = Some(BrightDataError::ToolError(error_msg));
585 if retry_attempt == max_retries - 1 {
586 return Err(last_error.unwrap());
587 }
588 continue;
589 }
590
591 let raw_content = response_text;
593 let filtered_content = if self.is_data_reduction_enabled() {
594 raw_content.clone()
596 } else {
597 raw_content.clone()
598 };
599
600 if let Err(e) = BRIGHTDATA_METRICS.log_call(
602 &attempt_id,
603 url,
604 &zone,
605 "raw",
606 None,
607 payload.clone(),
608 status,
609 response_headers.clone(),
610 &raw_content,
611 Some(&filtered_content),
612 duration.as_millis() as u64,
613 None,
614 None,
615 ).await {
616 warn!("Failed to log direct API metrics: {}", e);
617 }
618
619 return Ok(json!({
620 "content": filtered_content,
621 "raw_content": raw_content,
622 "query": query,
623 "market": market,
624 "source": source_name,
625 "method": "Direct BrightData API",
626 "priority": format!("{:?}", priority),
627 "token_budget": token_budget,
628 "execution_id": execution_id,
629 "sequence": sequence,
630 "method_sequence": method_sequence,
631 "success": true,
632 "url": url,
633 "zone": zone,
634 "format": "raw",
635 "status_code": status,
636 "response_size_bytes": raw_content.len(),
637 "filtered_size_bytes": filtered_content.len(),
638 "duration_ms": duration.as_millis(),
639 "timestamp": chrono::Utc::now().to_rfc3339(),
640 "retry_attempts": retry_attempt + 1,
641 "max_retries": max_retries,
642 "payload_used": payload
643 }));
644 }
645
646 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
647 }
648
649 async fn try_fetch_url_via_proxy(
651 &self,
652 url: &str,
653 query: &str,
654 market: &str,
655 source_name: &str,
656 priority: crate::filters::strategy::QueryPriority,
657 token_budget: usize,
658 execution_id: &str,
659 sequence: u64,
660 method_sequence: u64
661 ) -> Result<Value, BrightDataError> {
662 let max_retries = env::var("MAX_RETRIES")
663 .ok()
664 .and_then(|s| s.parse::<u32>().ok())
665 .unwrap_or(1);
666
667 let mut last_error = None;
668
669 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
671 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
672 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
673 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
674 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
675 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
676 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
677 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
678
679 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
680
681 for retry_attempt in 0..max_retries {
682 let start_time = Instant::now();
683 let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
684
685 info!("🔄 Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})",
686 source_name, attempt_id, retry_attempt + 1, max_retries);
687
688 let proxy = reqwest::Proxy::all(&proxy_url)
690 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
691
692 let client = Client::builder()
693 .proxy(proxy)
694 .timeout(Duration::from_secs(90))
695 .danger_accept_invalid_certs(true) .build()
697 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
698
699 let response = client
700 .get(url)
701 .header("x-unblock-data-format", "markdown")
702 .send()
703 .await
704 .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
705
706 let duration = start_time.elapsed();
707 let status = response.status().as_u16();
708 let response_headers: HashMap<String, String> = response
709 .headers()
710 .iter()
711 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
712 .collect();
713
714 let response_text = response.text().await
715 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
716
717 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
719 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
720 warn!("Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
721 tokio::time::sleep(wait_time).await;
722 last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
723 continue;
724 }
725
726 if !(200..300).contains(&status) {
727 let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status,
728 &response_text[..response_text.len().min(200)]);
729
730 warn!("Proxy HTTP error: {}", error_msg);
731 last_error = Some(BrightDataError::ToolError(error_msg));
732
733 let proxy_payload = json!({
735 "url": url,
736 "method": "proxy",
737 "proxy_host": proxy_host,
738 "proxy_port": proxy_port,
739 "error": format!("HTTP {}", status)
740 });
741
742 if let Err(e) = BRIGHTDATA_METRICS.log_call(
743 &attempt_id,
744 url,
745 "proxy",
746 "raw",
747 None,
748 proxy_payload,
749 status,
750 response_headers.clone(),
751 &response_text,
752 Some(&format!("Proxy HTTP {} Error", status)),
753 duration.as_millis() as u64,
754 None,
755 None,
756 ).await {
757 warn!("Failed to log proxy error metrics: {}", e);
758 }
759
760 if retry_attempt == max_retries - 1 {
761 return Err(last_error.unwrap());
762 }
763 continue;
764 }
765
766 let raw_content = response_text;
768 let filtered_content = if self.is_data_reduction_enabled() {
769 raw_content.clone()
771 } else {
772 raw_content.clone()
773 };
774
775 let proxy_payload = json!({
777 "url": url,
778 "method": "proxy",
779 "proxy_host": proxy_host,
780 "proxy_port": proxy_port
781 });
782
783 if let Err(e) = BRIGHTDATA_METRICS.log_call(
784 &attempt_id,
785 url,
786 "proxy",
787 "raw",
788 None,
789 proxy_payload.clone(),
790 status,
791 response_headers.clone(),
792 &raw_content,
793 Some(&filtered_content),
794 duration.as_millis() as u64,
795 None,
796 None,
797 ).await {
798 warn!("Failed to log proxy metrics: {}", e);
799 }
800
801 return Ok(json!({
802 "content": filtered_content,
803 "raw_content": raw_content,
804 "query": query,
805 "market": market,
806 "source": source_name,
807 "method": "BrightData Proxy",
808 "priority": format!("{:?}", priority),
809 "token_budget": token_budget,
810 "execution_id": execution_id,
811 "sequence": sequence,
812 "method_sequence": method_sequence,
813 "success": true,
814 "url": url,
815 "proxy_host": proxy_host,
816 "proxy_port": proxy_port,
817 "status_code": status,
818 "response_size_bytes": raw_content.len(),
819 "filtered_size_bytes": filtered_content.len(),
820 "duration_ms": duration.as_millis(),
821 "timestamp": chrono::Utc::now().to_rfc3339(),
822 "retry_attempts": retry_attempt + 1,
823 "max_retries": max_retries,
824 "payload_used": proxy_payload
825 }));
826 }
827
828 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
829 }
830
831 fn is_likely_crypto_symbol(&self, query: &str) -> bool {
832 let clean = query.trim();
833
834 if clean.len() < 1 || clean.len() > 15 {
835 return false;
836 }
837
838 let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
839 let has_letters = clean.chars().any(|c| c.is_alphabetic());
840
841 valid_chars && has_letters
842 }
843
844 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
846 let test_url = "https://finance.yahoo.com/quote/BTC-USD/";
847 let mut results = Vec::new();
848
849 info!("Testing Direct BrightData API...");
851 match self.try_fetch_url_direct_api(
852 test_url, "BTC", "usd", "Yahoo Finance Test",
853 crate::filters::strategy::QueryPriority::High, 1000,
854 "connectivity_test", 0, 0
855 ).await {
856 Ok(_) => {
857 results.push("Direct API: SUCCESS".to_string());
858 }
859 Err(e) => {
860 results.push(format!("Direct API: FAILED - {}", e));
861 }
862 }
863
864 info!("Testing Proxy method...");
866 match self.try_fetch_url_via_proxy(
867 test_url, "BTC", "usd", "Yahoo Finance Test",
868 crate::filters::strategy::QueryPriority::High, 1000,
869 "connectivity_test", 0, 1
870 ).await {
871 Ok(_) => {
872 results.push("Proxy: SUCCESS".to_string());
873 }
874 Err(e) => {
875 results.push(format!("Proxy: FAILED - {}", e));
876 }
877 }
878
879 Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
880 }
881}
882
883