1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::etf_cache::get_etf_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::etf_symbol::match_symbol_from_query;
16
17#[derive(Debug, Clone)]
19pub struct MethodUrls {
20 pub proxy: Vec<(String, String)>, pub direct: Vec<(String, String)>, }
23
24pub struct ETFDataTool;
25
26#[async_trait]
27impl Tool for ETFDataTool {
28 fn name(&self) -> &str {
29 "get_etf_data"
30 }
31
32 fn description(&self) -> &str {
33 "Get comprehensive ETF snapshot (price, summary, metrics) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}.NS/ (e.g., NIFTYBEES)."
34 }
35
36 fn input_schema(&self) -> Value {
37 json!({
38 "type": "object",
39 "properties": {
40 "query": {
41 "type": "string",
42 "description": "ETF ticker or name (e.g., NIFTYBEES, JUNIORBEES). If provided, used when 'symbol' missing."
43 },
44 "symbol": {
45 "type": "string",
46 "description": "ETF ticker (e.g., NIFTYBEES, JUNIORBEES, GOLDBEES)"
47 },
48 "data_type": {
49 "type": "string",
50 "enum": ["price", "fundamentals", "technical", "news", "all"],
51 "default": "all",
52 "description": "Focus slice for parsing/formatting"
53 },
54 "timeframe": {
55 "type": "string",
56 "enum": ["realtime", "day", "week", "month", "quarter", "year"],
57 "default": "realtime",
58 "description": "Time period context for analysis"
59 },
60 "include_ratios": {
61 "type": "boolean",
62 "default": true,
63 "description": "Include ratios/metrics if available"
64 },
65 "include_volume": {
66 "type": "boolean",
67 "default": true,
68 "description": "Include volume/liquidity info if available"
69 },
70 "user_id": {
71 "type": "string",
72 "description": "Session/user id for cache scoping"
73 }
74 },
75 "required": ["symbol", "user_id"]
76 })
77 }
78
79 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
80 self.execute_internal(parameters).await
81 }
82
83 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
84 let raw_query = parameters
85 .get("symbol")
86 .and_then(|v| v.as_str())
87 .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
88
89 let session_id = parameters
90 .get("user_id")
91 .and_then(|v| v.as_str())
92 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
93
94 let matched_symbol = match_symbol_from_query(raw_query);
96
97 let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
99
100 let market = parameters
101 .get("market")
102 .and_then(|v| v.as_str())
103 .unwrap_or("usd");
104
105 let data_type = parameters
106 .get("data_type")
107 .and_then(|v| v.as_str())
108 .unwrap_or("all");
109
110 let timeframe = parameters
111 .get("timeframe")
112 .and_then(|v| v.as_str())
113 .unwrap_or("realtime");
114
115 let include_ratios = parameters
116 .get("include_ratios")
117 .and_then(|v| v.as_bool())
118 .unwrap_or(true);
119
120 let include_volume = parameters
121 .get("include_volume")
122 .and_then(|v| v.as_bool())
123 .unwrap_or(true);
124
125 let query_priority = ResponseStrategy::classify_query_priority(query);
126 let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
127
128 let execution_id = format!("etf_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
129
130 info!("📈 ETF query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})",
131 query, market, query_priority, recommended_tokens, session_id);
132
133 match self.check_cache_first(query, session_id).await {
135 Ok(Some(cached_result)) => {
136 info!("🚀 Cache HIT: Returning cached data for {} in session {}", query, session_id);
137
138 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
140 let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
141 let method_used = "Redis Cache";
142
143 let formatted_response = self.create_formatted_etf_response(
144 query, market, content, source_used, method_used,
145 data_type, timeframe, include_ratios, include_volume, &execution_id
146 );
147
148 let tool_result = ToolResult::success_with_raw(
149 vec![McpContent::text(formatted_response)],
150 cached_result
151 );
152
153 if self.is_data_reduction_enabled() {
155 return Ok(ResponseStrategy::apply_size_limits(tool_result));
156 } else {
157 return Ok(tool_result);
158 }
159 }
160 Ok(None) => {
161 info!("💾 Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
162 }
163 Err(e) => {
164 warn!("🚨 Cache error (continuing with fresh fetch): {}", e);
165 }
166 }
167
168 match self.fetch_etf_data_with_fallbacks_and_priority(
170 query, market, data_type, timeframe, include_ratios, include_volume,
171 query_priority, recommended_tokens, &execution_id
172 ).await {
173 Ok(result) => {
174 if let Err(e) = self.store_in_cache(query, session_id, &result).await {
176 warn!("Failed to store result in cache: {}", e);
177 }
178
179 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
180 let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
181 let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
182
183 let formatted_response = self.create_formatted_etf_response(
185 query, market, content, source_used, method_used,
186 data_type, timeframe, include_ratios, include_volume, &execution_id
187 );
188
189 let tool_result = ToolResult::success_with_raw(
190 vec![McpContent::text(formatted_response)],
191 result
192 );
193
194 if self.is_data_reduction_enabled() {
196 Ok(ResponseStrategy::apply_size_limits(tool_result))
197 } else {
198 Ok(tool_result)
199 }
200 }
201 Err(_e) => {
202 warn!("BrightData error for query '{}', returning empty data for retry", query);
204 let empty_response = json!({
205 "query": query,
206 "market": market,
207 "status": "no_data",
208 "reason": "brightdata_error",
209 "execution_id": execution_id,
210 "session_id": session_id
211 });
212
213 Ok(ToolResult::success_with_raw(
214 vec![McpContent::text("📈 **No Data Available**\n\nPlease try again with a more specific ETF symbol.".to_string())],
215 empty_response
216 ))
217 }
218 }
219 }
220}
221
222impl ETFDataTool {
223 fn is_data_reduction_enabled(&self) -> bool {
225 std::env::var("DEDUCT_DATA")
226 .unwrap_or_else(|_| "false".to_string())
227 .to_lowercase() == "true"
228 }
229
230 fn create_formatted_etf_response(
232 &self,
233 query: &str,
234 market: &str,
235 content: &str,
236 source: &str,
237 method: &str,
238 data_type: &str,
239 timeframe: &str,
240 include_ratios: bool,
241 include_volume: bool,
242 execution_id: &str
243 ) -> String {
244 if !self.is_data_reduction_enabled() {
246 return format!(
247 "📈 **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
248 query.to_uppercase(),
249 market.to_uppercase(),
250 content,
251 source,
252 method,
253 data_type,
254 timeframe
255 );
256 }
257
258 format!(
261 "📈 **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} • Type: {} • Period: {}*",
262 query.to_uppercase(),
263 market.to_uppercase(),
264 content,
265 source,
266 method,
267 data_type,
268 timeframe
269 )
270 }
271
272 async fn check_cache_first(
274 &self,
275 query: &str,
276 session_id: &str,
277 ) -> Result<Option<Value>, BrightDataError> {
278 let cache_service = get_etf_cache().await?;
279 cache_service.get_cached_etf_data(session_id, query).await
280 }
281
282 async fn store_in_cache(
284 &self,
285 query: &str,
286 session_id: &str,
287 data: &Value,
288 ) -> Result<(), BrightDataError> {
289 let cache_service = get_etf_cache().await?;
290 cache_service.cache_etf_data(session_id, query, data.clone()).await
291 }
292
293 fn build_prioritized_urls_with_priority(
295 &self,
296 query: &str,
297 market: &str,
298 data_type: &str,
299 priority: crate::filters::strategy::QueryPriority
300 ) -> MethodUrls {
301 let mut proxy_urls = Vec::new();
302 let mut direct_urls = Vec::new();
303 let clean_query = query.trim().to_uppercase();
304
305 let max_sources = 3;
307
308 if self.is_likely_etf_symbol(&clean_query) {
309 match market {
310 "usd" => {
311 let symbols_to_try = vec![
312 format!("{}.NS", clean_query),
313 clean_query.clone(),
314 ];
315
316 for (i, symbol) in symbols_to_try.iter().enumerate() {
317 if i >= max_sources { break; }
318
319 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
320 let description = format!("Yahoo Finance ({})", symbol);
321
322 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
323 let proxy_description = format!("Yahoo Finance ({})", symbol);
324
325 proxy_urls.push((proxy_url, proxy_description));
327 direct_urls.push((url, description));
328 }
329 },
330 "inr" => {
331 let symbols_to_try = vec![
332 format!("{}.NS", clean_query),
333 clean_query.clone(),
334 ];
335
336 for (i, symbol) in symbols_to_try.iter().enumerate() {
337 if i >= max_sources { break; }
338
339 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
340 let description = format!("Yahoo Finance ({})", symbol);
341
342 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
343 let proxy_description = format!("Yahoo Finance ({})", symbol);
344
345 proxy_urls.push((proxy_url, proxy_description));
347 direct_urls.push((url, description));
348 }
349 }
350
351 _ => {
352 let symbols_to_try = vec![
353 format!("{}.NS", clean_query),
354 clean_query.clone(),
355 ];
356
357 for (i, symbol) in symbols_to_try.iter().enumerate() {
358 if i >= max_sources { break; }
359
360 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
361 let description = format!("Yahoo Finance ({})", symbol);
362
363 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
364 let proxy_description = format!("Yahoo Finance ({})", symbol);
365
366 proxy_urls.push((proxy_url, proxy_description));
368 direct_urls.push((url, description));
369 }
370 }
371
372 }
373 }
374
375 if proxy_urls.len() < max_sources {
377 let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
378 let description = "Yahoo Finance Search".to_string();
379
380 let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
381 let proxy_description = "Yahoo Finance Search".to_string();
382
383 proxy_urls.push((proxy_url, proxy_description));
384 direct_urls.push((url, description));
385 }
386
387 info!("🎯 Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})",
388 proxy_urls.len(), direct_urls.len(), query, priority);
389
390 MethodUrls {
391 proxy: proxy_urls,
392 direct: direct_urls,
393 }
394 }
395
396 async fn fetch_etf_data_with_fallbacks_and_priority(
398 &self,
399 query: &str,
400 market: &str,
401 data_type: &str,
402 timeframe: &str,
403 include_ratios: bool,
404 include_volume: bool,
405 query_priority: crate::filters::strategy::QueryPriority,
406 token_budget: usize,
407 execution_id: &str
408 ) -> Result<Value, BrightDataError> {
409 let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
410 let mut last_error = None;
411 let mut attempts = Vec::new();
412
413 let methods_to_try = vec![
415 ("proxy", "Proxy Fallback", &method_urls.proxy)
417 ];
418
419 for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
420 info!("🔄 Trying {} method with {} URLs", method_name, urls_for_method.len());
421
422 for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
423 let attempt_result = match *method_type {
424 "direct" => {
425 info!("🌐 Trying Direct BrightData API for {} (method: {}, url: {}/{})",
426 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
427 self.try_fetch_url_direct_api(
428 url, query, market, source_name, query_priority, token_budget,
429 execution_id, url_sequence as u64, method_sequence as u64
430 ).await
431 }
432 "proxy" => {
433 info!("🔄 Trying Proxy method for {} (method: {}, url: {}/{})",
434 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
435 self.try_fetch_url_via_proxy(
436 url, query, market, source_name, query_priority, token_budget,
437 execution_id, url_sequence as u64, method_sequence as u64
438 ).await
439 }
440 _ => continue,
441 };
442
443 match attempt_result {
444 Ok(mut result) => {
445 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
446
447 attempts.push(json!({
448 "source": source_name,
449 "url": url,
450 "method": method_name,
451 "status": "success",
452 "content_length": content.len(),
453 "method_sequence": method_sequence + 1,
454 "url_sequence": url_sequence + 1
455 }));
456
457 result["source_used"] = json!(source_name);
459 result["url_used"] = json!(url);
460 result["method_used"] = json!(method_name);
461 result["execution_id"] = json!(execution_id);
462 result["priority"] = json!(format!("{:?}", query_priority));
463 result["token_budget"] = json!(token_budget);
464 result["attempts"] = json!(attempts);
465 result["successful_method_sequence"] = json!(method_sequence + 1);
466 result["successful_url_sequence"] = json!(url_sequence + 1);
467
468 info!("✅ Successfully fetched ETF data from {} via {} (method: {}, url: {})",
469 source_name, method_name, method_sequence + 1, url_sequence + 1);
470
471 return Ok(result);
472 }
473 Err(e) => {
474 attempts.push(json!({
475 "source": source_name,
476 "url": url,
477 "method": method_name,
478 "status": "failed",
479 "error": e.to_string(),
480 "method_sequence": method_sequence + 1,
481 "url_sequence": url_sequence + 1
482 }));
483
484 last_error = Some(e);
485 warn!("❌ Failed to fetch from {} via {} (method: {}, url: {}): {:?}",
486 source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
487 }
488 }
489 }
490 }
491
492 warn!("❌ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
494
495 let empty_result = json!({
496 "query": query,
497 "market": market,
498 "status": "no_data_found",
499 "attempts": attempts,
500 "execution_id": execution_id,
501 "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
502 "reason": "all_sources_failed"
503 });
504
505 Ok(empty_result)
506 }
507
508 async fn try_fetch_url_direct_api(
510 &self,
511 url: &str,
512 query: &str,
513 market: &str,
514 source_name: &str,
515 priority: crate::filters::strategy::QueryPriority,
516 token_budget: usize,
517 execution_id: &str,
518 sequence: u64,
519 method_sequence: u64
520 ) -> Result<Value, BrightDataError> {
521 let max_retries = env::var("MAX_RETRIES")
522 .ok()
523 .and_then(|s| s.parse::<u32>().ok())
524 .unwrap_or(1);
525
526 let mut last_error = None;
527
528 for retry_attempt in 0..max_retries {
529 let start_time = Instant::now();
530 let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
531
532 info!("🌐 Direct API: Fetching from {} (execution: {}, retry: {}/{})",
533 source_name, attempt_id, retry_attempt + 1, max_retries);
534
535 let api_token = env::var("BRIGHTDATA_API_TOKEN")
536 .or_else(|_| env::var("API_TOKEN"))
537 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
538
539 let base_url = env::var("BRIGHTDATA_BASE_URL")
540 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
541
542 let zone = env::var("WEB_UNLOCKER_ZONE")
543 .unwrap_or_else(|_| "mcp_unlocker".to_string());
544
545 let payload = json!({
546 "url": url,
547 "zone": zone,
548 "format": "raw",
549 });
550
551 let client = Client::builder()
552 .timeout(Duration::from_secs(90))
553 .build()
554 .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
555
556 let response = client
557 .post(&format!("{}/request", base_url))
558 .header("Authorization", format!("Bearer {}", api_token))
559 .header("Content-Type", "application/json")
560 .json(&payload)
561 .send()
562 .await
563 .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
564
565 let duration = start_time.elapsed();
566 let status = response.status().as_u16();
567 let response_headers: HashMap<String, String> = response
568 .headers()
569 .iter()
570 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
571 .collect();
572
573 let response_text = response.text().await
574 .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
575
576 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
578 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
579 warn!("⏳ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
580 tokio::time::sleep(wait_time).await;
581 last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
582 continue;
583 }
584
585 if !(200..300).contains(&status) {
586 let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status,
587 &response_text[..response_text.len().min(500)]);
588 last_error = Some(BrightDataError::ToolError(error_msg));
589 if retry_attempt == max_retries - 1 {
590 return Err(last_error.unwrap());
591 }
592 continue;
593 }
594
595 let raw_content = response_text;
597 let filtered_content = if self.is_data_reduction_enabled() {
598 raw_content.clone()
600 } else {
601 raw_content.clone()
602 };
603
604 if let Err(e) = BRIGHTDATA_METRICS.log_call(
606 &attempt_id,
607 url,
608 &zone,
609 "raw",
610 None,
611 payload.clone(),
612 status,
613 response_headers.clone(),
614 &raw_content,
615 Some(&filtered_content),
616 duration.as_millis() as u64,
617 None,
618 None,
619 ).await {
620 warn!("Failed to log direct API metrics: {}", e);
621 }
622
623 return Ok(json!({
624 "content": filtered_content,
625 "raw_content": raw_content,
626 "query": query,
627 "market": market,
628 "source": source_name,
629 "method": "Direct BrightData API",
630 "priority": format!("{:?}", priority),
631 "token_budget": token_budget,
632 "execution_id": execution_id,
633 "sequence": sequence,
634 "method_sequence": method_sequence,
635 "success": true,
636 "url": url,
637 "zone": zone,
638 "format": "raw",
639 "status_code": status,
640 "response_size_bytes": raw_content.len(),
641 "filtered_size_bytes": filtered_content.len(),
642 "duration_ms": duration.as_millis(),
643 "timestamp": chrono::Utc::now().to_rfc3339(),
644 "retry_attempts": retry_attempt + 1,
645 "max_retries": max_retries,
646 "payload_used": payload
647 }));
648 }
649
650 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
651 }
652
653 async fn try_fetch_url_via_proxy(
655 &self,
656 url: &str,
657 query: &str,
658 market: &str,
659 source_name: &str,
660 priority: crate::filters::strategy::QueryPriority,
661 token_budget: usize,
662 execution_id: &str,
663 sequence: u64,
664 method_sequence: u64
665 ) -> Result<Value, BrightDataError> {
666 let max_retries = env::var("MAX_RETRIES")
667 .ok()
668 .and_then(|s| s.parse::<u32>().ok())
669 .unwrap_or(1);
670
671 let mut last_error = None;
672
673 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
675 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
676 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
677 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
678 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
679 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
680 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
681 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
682
683 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
684
685 for retry_attempt in 0..max_retries {
686 let start_time = Instant::now();
687 let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
688
689 info!("🔄 Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})",
690 source_name, attempt_id, retry_attempt + 1, max_retries);
691
692 let proxy = reqwest::Proxy::all(&proxy_url)
694 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
695
696 let client = Client::builder()
697 .proxy(proxy)
698 .timeout(Duration::from_secs(90))
699 .danger_accept_invalid_certs(true) .build()
701 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
702
703 let response = client
704 .get(url)
705 .header("x-unblock-data-format", "markdown")
706 .send()
707 .await
708 .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
709
710 let duration = start_time.elapsed();
711 let status = response.status().as_u16();
712 let response_headers: HashMap<String, String> = response
713 .headers()
714 .iter()
715 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
716 .collect();
717
718 let response_text = response.text().await
719 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
720
721 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
723 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
724 warn!("Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
725 tokio::time::sleep(wait_time).await;
726 last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
727 continue;
728 }
729
730 if !(200..300).contains(&status) {
731 let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status,
732 &response_text[..response_text.len().min(200)]);
733
734 warn!("Proxy HTTP error: {}", error_msg);
735 last_error = Some(BrightDataError::ToolError(error_msg));
736
737 let proxy_payload = json!({
739 "url": url,
740 "method": "proxy",
741 "proxy_host": proxy_host,
742 "proxy_port": proxy_port,
743 "error": format!("HTTP {}", status)
744 });
745
746 if let Err(e) = BRIGHTDATA_METRICS.log_call(
747 &attempt_id,
748 url,
749 "proxy",
750 "raw",
751 None,
752 proxy_payload,
753 status,
754 response_headers.clone(),
755 &response_text,
756 Some(&format!("Proxy HTTP {} Error", status)),
757 duration.as_millis() as u64,
758 None,
759 None,
760 ).await {
761 warn!("Failed to log proxy error metrics: {}", e);
762 }
763
764 if retry_attempt == max_retries - 1 {
765 return Err(last_error.unwrap());
766 }
767 continue;
768 }
769
770 let raw_content = response_text;
772 let filtered_content = if self.is_data_reduction_enabled() {
773 raw_content.clone()
775 } else {
776 raw_content.clone()
777 };
778
779 let proxy_payload = json!({
781 "url": url,
782 "method": "proxy",
783 "proxy_host": proxy_host,
784 "proxy_port": proxy_port
785 });
786
787 if let Err(e) = BRIGHTDATA_METRICS.log_call(
788 &attempt_id,
789 url,
790 "proxy",
791 "raw",
792 None,
793 proxy_payload.clone(),
794 status,
795 response_headers.clone(),
796 &raw_content,
797 Some(&filtered_content),
798 duration.as_millis() as u64,
799 None,
800 None,
801 ).await {
802 warn!("Failed to log proxy metrics: {}", e);
803 }
804
805 return Ok(json!({
806 "content": filtered_content,
807 "raw_content": raw_content,
808 "query": query,
809 "market": market,
810 "source": source_name,
811 "method": "BrightData Proxy",
812 "priority": format!("{:?}", priority),
813 "token_budget": token_budget,
814 "execution_id": execution_id,
815 "sequence": sequence,
816 "method_sequence": method_sequence,
817 "success": true,
818 "url": url,
819 "proxy_host": proxy_host,
820 "proxy_port": proxy_port,
821 "status_code": status,
822 "response_size_bytes": raw_content.len(),
823 "filtered_size_bytes": filtered_content.len(),
824 "duration_ms": duration.as_millis(),
825 "timestamp": chrono::Utc::now().to_rfc3339(),
826 "retry_attempts": retry_attempt + 1,
827 "max_retries": max_retries,
828 "payload_used": proxy_payload
829 }));
830 }
831
832 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
833 }
834
835 fn is_likely_etf_symbol(&self, query: &str) -> bool {
836 let clean = query.trim();
837
838 if clean.len() < 1 || clean.len() > 15 {
839 return false;
840 }
841
842 let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
843 let has_letters = clean.chars().any(|c| c.is_alphabetic());
844
845 valid_chars && has_letters
846 }
847
848 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
850 let test_url = "https://finance.yahoo.com/quote/BTC-USD/";
851 let mut results = Vec::new();
852
853 info!("Testing Direct BrightData API...");
855 match self.try_fetch_url_direct_api(
856 test_url, "BTC", "usd", "Yahoo Finance Test",
857 crate::filters::strategy::QueryPriority::High, 1000,
858 "connectivity_test", 0, 0
859 ).await {
860 Ok(_) => {
861 results.push("Direct API: SUCCESS".to_string());
862 }
863 Err(e) => {
864 results.push(format!("Direct API: FAILED - {}", e));
865 }
866 }
867
868 info!("Testing Proxy method...");
870 match self.try_fetch_url_via_proxy(
871 test_url, "BTC", "usd", "Yahoo Finance Test",
872 crate::filters::strategy::QueryPriority::High, 1000,
873 "connectivity_test", 0, 1
874 ).await {
875 Ok(_) => {
876 results.push("Proxy: SUCCESS".to_string());
877 }
878 Err(e) => {
879 results.push(format!("Proxy: FAILED - {}", e));
880 }
881 }
882
883 Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
884 }
885}
886
887