1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::bond_cache::get_bond_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::bond_symbol::match_symbol_from_query;
16
17#[derive(Debug, Clone)]
19pub struct MethodUrls {
20 pub proxy: Vec<(String, String)>, pub direct: Vec<(String, String)>, }
23
24pub struct BondDataTool;
25
26#[async_trait]
27impl Tool for BondDataTool {
28 fn name(&self) -> &str {
29 "get_bond_data"
30 }
31
32 fn description(&self) -> &str {
33 "Get bond/fund snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/^{SYMBOL}/ (e.g., ^TNX, ^IRX)."
34 }
35
36 fn input_schema(&self) -> Value {
37 json!({
38 "type": "object",
39 "properties": {
40 "query": {
41 "type": "string",
42 "description": "Bond symbol (e.g., ^TNX, ^IRX, ^TYX, ^FVX). Used if 'symbol' missing.",
43 },
44 "symbol": {
45 "type": "string",
46 "description": "Bond symbol (e.g., ^TNX, ^IRX, ^TYX, ^FVX). Used if 'symbol' missing.",
47 },
48 "data_type": {
49 "type": "string",
50 "enum": ["price", "technical", "news", "all"],
51 "default": "all",
52 "description": "Focus slice for parsing/formatting"
53 },
54 "timeframe": {
55 "type": "string",
56 "enum": ["realtime", "day", "week", "month", "year"],
57 "default": "realtime",
58 "description": "Time period context for analysis"
59 },
60 "user_id": {
61 "type": "string",
62 "description": "Session/user id for cache scoping"
63 }
64 },
65 "required": ["symbol", "user_id"]
66 })
67 }
68
69
70 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71 self.execute_internal(parameters).await
72 }
73
74 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75 let raw_query = parameters
76 .get("symbol")
77 .and_then(|v| v.as_str())
78 .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
79
80 let session_id = parameters
81 .get("user_id")
82 .and_then(|v| v.as_str())
83 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85 let matched_symbol = match_symbol_from_query(raw_query);
87
88 let query = if matched_symbol.contains('.') {
90 matched_symbol.as_str()
92 } else if matched_symbol.starts_with('^') {
93 matched_symbol.as_str()
95 } else {
96 matched_symbol.split('.').next().unwrap_or(&matched_symbol)
98 };
99
100 let market = parameters
101 .get("market")
102 .and_then(|v| v.as_str())
103 .unwrap_or("indian");
104
105 let data_type = parameters
106 .get("data_type")
107 .and_then(|v| v.as_str())
108 .unwrap_or("all");
109
110 let timeframe = parameters
111 .get("timeframe")
112 .and_then(|v| v.as_str())
113 .unwrap_or("realtime");
114
115 let include_ratios = parameters
116 .get("include_ratios")
117 .and_then(|v| v.as_bool())
118 .unwrap_or(true);
119
120 let include_volume = parameters
121 .get("include_volume")
122 .and_then(|v| v.as_bool())
123 .unwrap_or(true);
124
125 let query_priority = ResponseStrategy::classify_query_priority(query);
126 let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
127
128 let execution_id = format!("bond_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
129
130 info!("๐ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})",
131 query, market, query_priority, recommended_tokens, session_id);
132
133 match self.check_cache_first(query, session_id).await {
135 Ok(Some(cached_result)) => {
136 info!("๐ Cache HIT: Returning cached data for {} in session {}", query, session_id);
137
138 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
140 let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
141 let method_used = "Redis Cache";
142
143 let formatted_response = self.create_formatted_bond_response(
144 query, market, content, source_used, method_used,
145 data_type, timeframe, include_ratios, include_volume, &execution_id
146 );
147
148 let tool_result = ToolResult::success_with_raw(
149 vec![McpContent::text(formatted_response)],
150 cached_result
151 );
152
153 if self.is_data_reduction_enabled() {
155 return Ok(ResponseStrategy::apply_size_limits(tool_result));
156 } else {
157 return Ok(tool_result);
158 }
159 }
160 Ok(None) => {
161 info!("๐พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
162 }
163 Err(e) => {
164 warn!("๐จ Cache error (continuing with fresh fetch): {}", e);
165 }
166 }
167
168 match self.fetch_bond_data_with_fallbacks_and_priority(
170 query, market, data_type, timeframe, include_ratios, include_volume,
171 query_priority, recommended_tokens, &execution_id
172 ).await {
173 Ok(result) => {
174 if let Err(e) = self.store_in_cache(query, session_id, &result).await {
176 warn!("Failed to store result in cache: {}", e);
177 }
178
179 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
180 let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
181 let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
182
183 let formatted_response = self.create_formatted_bond_response(
185 query, market, content, source_used, method_used,
186 data_type, timeframe, include_ratios, include_volume, &execution_id
187 );
188
189 let tool_result = ToolResult::success_with_raw(
190 vec![McpContent::text(formatted_response)],
191 result
192 );
193
194 if self.is_data_reduction_enabled() {
196 Ok(ResponseStrategy::apply_size_limits(tool_result))
197 } else {
198 Ok(tool_result)
199 }
200 }
201 Err(_e) => {
202 warn!("BrightData error for query '{}', returning empty data for retry", query);
204 let empty_response = json!({
205 "query": query,
206 "market": market,
207 "status": "no_data",
208 "reason": "brightdata_error",
209 "execution_id": execution_id,
210 "session_id": session_id
211 });
212
213 Ok(ToolResult::success_with_raw(
214 vec![McpContent::text("๐ **No Data Available**\n\nPlease try again with a more specific bond symbol.".to_string())],
215 empty_response
216 ))
217 }
218 }
219 }
220}
221
222impl BondDataTool {
223 fn is_data_reduction_enabled(&self) -> bool {
225 std::env::var("DEDUCT_DATA")
226 .unwrap_or_else(|_| "false".to_string())
227 .to_lowercase() == "true"
228 }
229
230 fn create_formatted_bond_response(
232 &self,
233 query: &str,
234 market: &str,
235 content: &str,
236 source: &str,
237 method: &str,
238 data_type: &str,
239 timeframe: &str,
240 include_ratios: bool,
241 include_volume: bool,
242 execution_id: &str
243 ) -> String {
244 if !self.is_data_reduction_enabled() {
246 return format!(
247 "๐ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
248 query.to_uppercase(),
249 market.to_uppercase(),
250 content,
251 source,
252 method,
253 data_type,
254 timeframe
255 );
256 }
257
258 format!(
261 "๐ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
262 query.to_uppercase(),
263 market.to_uppercase(),
264 content,
265 source,
266 method,
267 data_type,
268 timeframe
269 )
270 }
271
272 fn extract_essential_bond_data(&self, content: &str, query: &str) -> String {
274 content.to_string()
277 }
278
279 fn extract_financial_lines(&self, content: &str) -> String {
281 content.to_string()
284 }
285
286 fn format_financial_metrics(&self, data: &str) -> String {
288 data.to_string()
291 }
292
293 async fn check_cache_first(
295 &self,
296 query: &str,
297 session_id: &str,
298 ) -> Result<Option<Value>, BrightDataError> {
299 let cache_service = get_bond_cache().await?;
300 cache_service.get_cached_bond_data(session_id, query).await
301 }
302
303 async fn store_in_cache(
305 &self,
306 query: &str,
307 session_id: &str,
308 data: &Value,
309 ) -> Result<(), BrightDataError> {
310 let cache_service = get_bond_cache().await?;
311 cache_service.cache_bond_data(session_id, query, data.clone()).await
312 }
313
314 pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
316 let cache_service = get_bond_cache().await?;
317 cache_service.get_session_bond_symbols(session_id).await
318 }
319
320 pub async fn clear_symbol_cache(
322 &self,
323 symbol: &str,
324 session_id: &str,
325 ) -> Result<(), BrightDataError> {
326 let cache_service = get_bond_cache().await?;
327 cache_service.clear_bond_symbol_cache(session_id, symbol).await
328 }
329
330 pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
332 let cache_service = get_bond_cache().await?;
333 cache_service.clear_session_bond_cache(session_id).await
334 }
335
336 pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
338 let cache_service = get_bond_cache().await?;
339 cache_service.get_bond_cache_stats().await
340 }
341
342 pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
344 let mut results = Vec::new();
345
346 info!("๐งช Testing Redis Cache...");
348 match get_bond_cache().await {
349 Ok(cache_service) => {
350 match cache_service.health_check().await {
351 Ok(_) => results.push("โ
Redis Cache: SUCCESS".to_string()),
352 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
353 }
354 }
355 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
356 }
357
358 let api_test = self.test_connectivity().await?;
360 results.push(api_test);
361
362 Ok(format!("๐ Enhanced Connectivity Test Results:\n{}", results.join("\n")))
363 }
364
365 fn build_prioritized_urls_with_priority(
367 &self,
368 query: &str,
369 market: &str,
370 data_type: &str,
371 priority: crate::filters::strategy::QueryPriority
372 ) -> MethodUrls {
373 let mut proxy_urls = Vec::new();
374 let mut direct_urls = Vec::new();
375 let clean_query = query.trim(); let max_sources = 3;
378
379 if self.is_likely_bond_symbol(&clean_query) {
380 match market {
381 "usd" => {
382 let symbols_to_try = if clean_query.contains('.') {
383 vec![clean_query.to_string()]
385 } else if clean_query.starts_with('^') {
386 vec![clean_query.to_string()]
388 } else {
389 vec![
391 format!("{}=X", clean_query.to_uppercase()),
392 clean_query.to_uppercase(),
393 ]
394 };
395
396 for (i, symbol) in symbols_to_try.iter().enumerate() {
397 if i >= max_sources { break; }
398
399 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
400 let description = format!("Yahoo Finance ({})", symbol);
401
402 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
403 let proxy_description = format!("Yahoo Finance ({})", symbol);
404
405 proxy_urls.push((proxy_url, proxy_description));
406 direct_urls.push((url, description));
407 }
408 },
409 "inr" | _ => {
410 let symbols_to_try = if clean_query.contains('.') {
411 vec![clean_query.to_string()]
413 } else {
414 vec![
416 clean_query.to_string(),
417 clean_query.to_uppercase(),
418 ]
419 };
420
421 for (i, symbol) in symbols_to_try.iter().enumerate() {
422 if i >= max_sources { break; }
423
424 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
425 let description = format!("Yahoo Finance ({})", symbol);
426
427 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
428 let proxy_description = format!("Yahoo Finance ({})", symbol);
429
430 proxy_urls.push((proxy_url, proxy_description));
431 direct_urls.push((url, description));
432 }
433 }
434 }
435 }
436
437 if proxy_urls.len() < max_sources {
439 let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
440 let description = "Yahoo Finance Search".to_string();
441
442 let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
443 let proxy_description = "Yahoo Finance Search".to_string();
444
445 proxy_urls.push((proxy_url, proxy_description));
446 direct_urls.push((url, description));
447 }
448
449 info!("๐ฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})",
450 proxy_urls.len(), direct_urls.len(), query, priority);
451
452 MethodUrls {
453 proxy: proxy_urls,
454 direct: direct_urls,
455 }
456 }
457
458 async fn fetch_bond_data_with_fallbacks_and_priority(
460 &self,
461 query: &str,
462 market: &str,
463 data_type: &str,
464 timeframe: &str,
465 include_ratios: bool,
466 include_volume: bool,
467 query_priority: crate::filters::strategy::QueryPriority,
468 token_budget: usize,
469 execution_id: &str
470 ) -> Result<Value, BrightDataError> {
471 let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
472 let mut last_error = None;
473 let mut attempts = Vec::new();
474
475 let methods_to_try = vec![
477 ("proxy", "Proxy Fallback", &method_urls.proxy)
479 ];
480
481 for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
482 info!("๐ Trying {} method with {} URLs", method_name, urls_for_method.len());
483
484 for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
485 let attempt_result = match *method_type {
486 "direct" => {
487 info!("๐ Trying Direct BrightData API for {} (method: {}, url: {}/{})",
488 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
489 self.try_fetch_url_direct_api(
490 url, query, market, source_name, query_priority, token_budget,
491 execution_id, url_sequence as u64, method_sequence as u64
492 ).await
493 }
494 "proxy" => {
495 info!("๐ Trying Proxy method for {} (method: {}, url: {}/{})",
496 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
497 self.try_fetch_url_via_proxy(
498 url, query, market, source_name, query_priority, token_budget,
499 execution_id, url_sequence as u64, method_sequence as u64
500 ).await
501 }
502 _ => continue,
503 };
504
505 match attempt_result {
506 Ok(mut result) => {
507 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
508
509 attempts.push(json!({
510 "source": source_name,
511 "url": url,
512 "method": method_name,
513 "status": "success",
514 "content_length": content.len(),
515 "method_sequence": method_sequence + 1,
516 "url_sequence": url_sequence + 1
517 }));
518
519 let should_try_next = if self.is_data_reduction_enabled() {
521 false
523 } else {
524 false
525 };
526
527 if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
528 if url_sequence < urls_for_method.len() - 1 {
529 warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
530 continue; } else {
532 warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
533 break; }
535 }
536
537 if self.is_data_reduction_enabled() {
539 }
542
543 result["source_used"] = json!(source_name);
545 result["url_used"] = json!(url);
546 result["method_used"] = json!(method_name);
547 result["execution_id"] = json!(execution_id);
548 result["priority"] = json!(format!("{:?}", query_priority));
549 result["token_budget"] = json!(token_budget);
550 result["attempts"] = json!(attempts);
551 result["successful_method_sequence"] = json!(method_sequence + 1);
552 result["successful_url_sequence"] = json!(url_sequence + 1);
553
554 info!("โ
Successfully fetched bond data from {} via {} (method: {}, url: {})",
555 source_name, method_name, method_sequence + 1, url_sequence + 1);
556
557 return Ok(result);
558 }
559 Err(e) => {
560 attempts.push(json!({
561 "source": source_name,
562 "url": url,
563 "method": method_name,
564 "status": "failed",
565 "error": e.to_string(),
566 "method_sequence": method_sequence + 1,
567 "url_sequence": url_sequence + 1
568 }));
569
570 last_error = Some(e);
571 warn!("โ Failed to fetch from {} via {} (method: {}, url: {}): {:?}",
572 source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
573 }
574 }
575 }
576 }
577
578 warn!("โ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
580
581 let empty_result = json!({
582 "query": query,
583 "market": market,
584 "status": "no_data_found",
585 "attempts": attempts,
586 "execution_id": execution_id,
587 "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
588 "reason": "all_sources_failed"
589 });
590
591 Ok(empty_result)
592 }
593
594 async fn try_fetch_url_direct_api(
596 &self,
597 url: &str,
598 query: &str,
599 market: &str,
600 source_name: &str,
601 priority: crate::filters::strategy::QueryPriority,
602 token_budget: usize,
603 execution_id: &str,
604 sequence: u64,
605 method_sequence: u64
606 ) -> Result<Value, BrightDataError> {
607 let max_retries = env::var("MAX_RETRIES")
608 .ok()
609 .and_then(|s| s.parse::<u32>().ok())
610 .unwrap_or(1);
611
612 let mut last_error = None;
613
614 for retry_attempt in 0..max_retries {
615 let start_time = Instant::now();
616 let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
617
618 info!("๐ Direct API: Fetching from {} (execution: {}, retry: {}/{})",
619 source_name, attempt_id, retry_attempt + 1, max_retries);
620
621 let api_token = env::var("BRIGHTDATA_API_TOKEN")
622 .or_else(|_| env::var("API_TOKEN"))
623 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
624
625 let base_url = env::var("BRIGHTDATA_BASE_URL")
626 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
627
628 let zone = env::var("WEB_UNLOCKER_ZONE")
629 .unwrap_or_else(|_| "mcp_unlocker".to_string());
630
631 let payload = json!({
632 "url": url,
633 "zone": zone,
634 "format": "raw",
635 });
637
638 if retry_attempt == 0 {
639 info!("๐ค Direct API Request:");
640 info!(" Endpoint: {}/request", base_url);
641 info!(" Zone: {}", zone);
642 info!(" Target: {}", url);
643 }
644
645 let client = Client::builder()
646 .timeout(Duration::from_secs(90))
647 .build()
648 .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
649
650 let response = client
651 .post(&format!("{}/request", base_url))
652 .header("Authorization", format!("Bearer {}", api_token))
653 .header("Content-Type", "application/json")
654 .json(&payload)
655 .send()
656 .await
657 .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
658
659 let duration = start_time.elapsed();
660 let status = response.status().as_u16();
661 let response_headers: HashMap<String, String> = response
662 .headers()
663 .iter()
664 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
665 .collect();
666
667 info!("๐ฅ Direct API Response (retry {}):", retry_attempt + 1);
668 info!(" Status: {}", status);
669 info!(" Duration: {}ms", duration.as_millis());
670
671 let response_text = response.text().await
672 .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
673
674 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
676 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
677 warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
678 tokio::time::sleep(wait_time).await;
679 last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
680 continue;
681 }
682
683 if !(200..300).contains(&status) {
684 let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status,
685 &response_text[..response_text.len().min(500)]);
686 last_error = Some(BrightDataError::ToolError(error_msg));
687 if retry_attempt == max_retries - 1 {
688 return Err(last_error.unwrap());
689 }
690 continue;
691 }
692
693 let raw_content = response_text;
695 let filtered_content = if self.is_data_reduction_enabled() {
696 raw_content.clone()
698 } else {
699 raw_content.clone()
700 };
701
702 info!("๐ Direct API: Content processed: {} bytes -> {} bytes",
703 raw_content.len(), filtered_content.len());
704
705 if let Err(e) = BRIGHTDATA_METRICS.log_call(
707 &attempt_id,
708 url,
709 &zone,
710 "raw",
711 None,
712 payload.clone(),
713 status,
714 response_headers.clone(),
715 &raw_content,
716 Some(&filtered_content),
717 duration.as_millis() as u64,
718 None,
719 None,
720 ).await {
721 warn!("Failed to log direct API metrics: {}", e);
722 }
723
724 return Ok(json!({
725 "content": filtered_content,
726 "raw_content": raw_content,
727 "query": query,
728 "market": market,
729 "source": source_name,
730 "method": "Direct BrightData API",
731 "priority": format!("{:?}", priority),
732 "token_budget": token_budget,
733 "execution_id": execution_id,
734 "sequence": sequence,
735 "method_sequence": method_sequence,
736 "success": true,
737 "url": url,
738 "zone": zone,
739 "format": "raw",
740 "status_code": status,
741 "response_size_bytes": raw_content.len(),
742 "filtered_size_bytes": filtered_content.len(),
743 "duration_ms": duration.as_millis(),
744 "timestamp": chrono::Utc::now().to_rfc3339(),
745 "retry_attempts": retry_attempt + 1,
746 "max_retries": max_retries,
747 "payload_used": payload
748 }));
749 }
750
751 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
752 }
753
754 async fn try_fetch_url_via_proxy(
756 &self,
757 url: &str,
758 query: &str,
759 market: &str,
760 source_name: &str,
761 priority: crate::filters::strategy::QueryPriority,
762 token_budget: usize,
763 execution_id: &str,
764 sequence: u64,
765 method_sequence: u64
766 ) -> Result<Value, BrightDataError> {
767 let max_retries = env::var("MAX_RETRIES")
768 .ok()
769 .and_then(|s| s.parse::<u32>().ok())
770 .unwrap_or(1);
771
772 let mut last_error = None;
773
774 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
776 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
777 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
778 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
779 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
780 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
781 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
782 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
783
784 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
785
786 for retry_attempt in 0..max_retries {
787 let start_time = Instant::now();
788 let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
789
790 info!("๐ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})",
791 source_name, attempt_id, retry_attempt + 1, max_retries);
792
793 if retry_attempt == 0 {
794 info!("๐ค Proxy Request:");
795 info!(" Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
796 info!(" Target: {}", url);
797 }
798
799 let proxy = reqwest::Proxy::all(&proxy_url)
801 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
802
803 let client = Client::builder()
804 .proxy(proxy)
805 .timeout(Duration::from_secs(90))
806 .danger_accept_invalid_certs(true) .build()
808 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
809
810 let response = client
811 .get(url)
812 .header("x-unblock-data-format", "markdown")
813 .send()
814 .await
815 .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
816
817 let duration = start_time.elapsed();
818 let status = response.status().as_u16();
819 let response_headers: HashMap<String, String> = response
820 .headers()
821 .iter()
822 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
823 .collect();
824
825 info!("๐ฅ Proxy Response (retry {}):", retry_attempt + 1);
826 info!(" Status: {}", status);
827 info!(" Duration: {}ms", duration.as_millis());
828
829 let response_text = response.text().await
830 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
831
832 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
834 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
835 warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
836 tokio::time::sleep(wait_time).await;
837 last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
838 continue;
839 }
840
841 if !(200..300).contains(&status) {
842 println!("-----------------------------------------------------------------");
843 println!("MARKDOWN SUCCESS: {:?}", status.clone());
844 println!("-----------------------------------------------------------------");
845 let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status,
846 &response_text[..response_text.len().min(200)]);
847
848 warn!("Proxy HTTP error: {}", error_msg);
849 last_error = Some(BrightDataError::ToolError(error_msg));
850
851 let proxy_payload = json!({
853 "url": url,
854 "method": "proxy",
855 "proxy_host": proxy_host,
856 "proxy_port": proxy_port,
857 "error": format!("HTTP {}", status)
858 });
859
860 if let Err(e) = BRIGHTDATA_METRICS.log_call(
861 &attempt_id,
862 url,
863 "proxy",
864 "raw",
865 None,
866 proxy_payload,
867 status,
868 response_headers.clone(),
869 &response_text,
870 Some(&format!("Proxy HTTP {} Error", status)),
871 duration.as_millis() as u64,
872 None,
873 None,
874 ).await {
875 warn!("Failed to log proxy error metrics: {}", e);
876 }
877
878 if retry_attempt == max_retries - 1 {
879 return Err(last_error.unwrap());
880 }
881 continue;
882 }
883
884 let raw_content = response_text;
886 let filtered_content = if self.is_data_reduction_enabled() {
887 raw_content.clone()
889 } else {
890 raw_content.clone()
891 };
892
893 info!("๐ Proxy: Content processed: {} bytes -> {} bytes",
894 raw_content.len(), filtered_content.len());
895
896 let proxy_payload = json!({
898 "url": url,
899 "method": "proxy",
900 "proxy_host": proxy_host,
901 "proxy_port": proxy_port
902 });
903
904 if let Err(e) = BRIGHTDATA_METRICS.log_call(
905 &attempt_id,
906 url,
907 "proxy",
908 "raw",
909 None,
910 proxy_payload.clone(),
911 status,
912 response_headers.clone(),
913 &raw_content,
914 Some(&filtered_content),
915 duration.as_millis() as u64,
916 None,
917 None,
918 ).await {
919 warn!("Failed to log proxy metrics: {}", e);
920 }
921
922 return Ok(json!({
923 "content": filtered_content,
924 "raw_content": raw_content,
925 "query": query,
926 "market": market,
927 "source": source_name,
928 "method": "BrightData Proxy",
929 "priority": format!("{:?}", priority),
930 "token_budget": token_budget,
931 "execution_id": execution_id,
932 "sequence": sequence,
933 "method_sequence": method_sequence,
934 "success": true,
935 "url": url,
936 "proxy_host": proxy_host,
937 "proxy_port": proxy_port,
938 "status_code": status,
939 "response_size_bytes": raw_content.len(),
940 "filtered_size_bytes": filtered_content.len(),
941 "duration_ms": duration.as_millis(),
942 "timestamp": chrono::Utc::now().to_rfc3339(),
943 "retry_attempts": retry_attempt + 1,
944 "max_retries": max_retries,
945 "payload_used": proxy_payload
946 }));
947 }
948
949 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
950 }
951
952 fn is_likely_bond_symbol(&self, query: &str) -> bool {
953 let clean = query.trim();
954
955 if clean.len() < 1 || clean.len() > 15 {
956 return false;
957 }
958
959 let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.' || c == '^' || c == '=');
961 let has_letters = clean.chars().any(|c| c.is_alphabetic());
962
963 valid_chars && has_letters
964 }
965
966 fn normalize_pair(raw: &str) -> String {
968 let s = raw.trim().to_uppercase().replace(['/', '-', ' '], "");
970 s.trim().to_string()
971 }
972
973 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
975 let test_url = "https://finance.yahoo.com/quote/USDINR=X/";
976 let mut results = Vec::new();
977
978 info!("Testing Direct BrightData API...");
980 match self.try_fetch_url_direct_api(
981 test_url, "BTC", "usd", "Yahoo Finance Test",
982 crate::filters::strategy::QueryPriority::High, 1000,
983 "connectivity_test", 0, 0
984 ).await {
985 Ok(_) => {
986 results.push("Direct API: SUCCESS".to_string());
987 }
988 Err(e) => {
989 results.push(format!("Direct API: FAILED - {}", e));
990 }
991 }
992
993 info!("Testing Proxy method...");
995 match self.try_fetch_url_via_proxy(
996 test_url, "BTC", "usd", "Yahoo Finance Test",
997 crate::filters::strategy::QueryPriority::High, 1000,
998 "connectivity_test", 0, 1
999 ).await {
1000 Ok(_) => {
1001 results.push("Proxy: SUCCESS".to_string());
1002 }
1003 Err(e) => {
1004 results.push(format!("Proxy: FAILED - {}", e));
1005 }
1006 }
1007
1008 Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
1009 }
1010}