1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::commodity_cache::get_commodity_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::commodity_symbol::match_symbol_from_query;
16
17#[derive(Debug, Clone)]
19pub struct MethodUrls {
20 pub proxy: Vec<(String, String)>, pub direct: Vec<(String, String)>, }
23
24pub struct CommodityDataTool;
25
26#[async_trait]
27impl Tool for CommodityDataTool {
28 fn name(&self) -> &str {
29 "get_commodity_data"
30 }
31
32 fn description(&self) -> &str {
33 "Get commodity (futures) snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://in.tradingview.com/symbols/MCX-{}!/ (e.g., MCX-NATURALGAS1, MCX-CRUDEOIL1)."
34 }
35
36 fn input_schema(&self) -> Value {
37 json!({
38 "type": "object",
39 "properties": {
40 "query": {
41 "type": "string",
42 "description": "Commodity/futures symbol (e.g., CRUDEOIL, CRUDEOIL, NATURALGAS). Used if 'symbol' missing."
43 },
44 "symbol": {
45 "type": "string",
46 "description": "Commodity/futures symbol (e.g., CRUDEOIL, CRUDEOIL, NATURALGAS). Used if 'symbol' missing."
47 },
48 "data_type": {
49 "type": "string",
50 "enum": ["price", "technical", "news", "all"],
51 "default": "all",
52 "description": "Focus slice for parsing/formatting"
53 },
54 "timeframe": {
55 "type": "string",
56 "enum": ["realtime", "day", "week", "month", "year"],
57 "default": "realtime",
58 "description": "Time period context for analysis"
59 },
60 "user_id": {
61 "type": "string",
62 "description": "Session/user id for cache scoping"
63 }
64 },
65 "required": ["symbol", "user_id"]
66 })
67 }
68
69
70 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
71 self.execute_internal(parameters).await
72 }
73
74 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
75 let raw_query = parameters
76 .get("symbol")
77 .and_then(|v| v.as_str())
78 .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
79
80 let session_id = parameters
81 .get("user_id")
82 .and_then(|v| v.as_str())
83 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
84
85 let matched_symbol = match_symbol_from_query(raw_query);
87
88 let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
90
91 let market = parameters
92 .get("market")
93 .and_then(|v| v.as_str())
94 .unwrap_or("indian");
95
96 let data_type = parameters
97 .get("data_type")
98 .and_then(|v| v.as_str())
99 .unwrap_or("all");
100
101 let timeframe = parameters
102 .get("timeframe")
103 .and_then(|v| v.as_str())
104 .unwrap_or("realtime");
105
106 let include_ratios = parameters
107 .get("include_ratios")
108 .and_then(|v| v.as_bool())
109 .unwrap_or(true);
110
111 let include_volume = parameters
112 .get("include_volume")
113 .and_then(|v| v.as_bool())
114 .unwrap_or(true);
115
116 let query_priority = ResponseStrategy::classify_query_priority(query);
117 let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
118
119 let execution_id = format!("commodity_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
120
121 info!("๐ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})",
122 query, market, query_priority, recommended_tokens, session_id);
123
124 match self.check_cache_first(query, session_id).await {
126 Ok(Some(cached_result)) => {
127 info!("๐ Cache HIT: Returning cached data for {} in session {}", query, session_id);
128
129 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
131 let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
132 let method_used = "Redis Cache";
133
134 let formatted_response = self.create_formatted_commodity_response(
135 query, market, content, source_used, method_used,
136 data_type, timeframe, include_ratios, include_volume, &execution_id
137 );
138
139 let tool_result = ToolResult::success_with_raw(
140 vec![McpContent::text(formatted_response)],
141 cached_result
142 );
143
144 if self.is_data_reduction_enabled() {
146 return Ok(ResponseStrategy::apply_size_limits(tool_result));
147 } else {
148 return Ok(tool_result);
149 }
150 }
151 Ok(None) => {
152 info!("๐พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
153 }
154 Err(e) => {
155 warn!("๐จ Cache error (continuing with fresh fetch): {}", e);
156 }
157 }
158
159 match self.fetch_commodity_data_with_fallbacks_and_priority(
161 query, market, data_type, timeframe, include_ratios, include_volume,
162 query_priority, recommended_tokens, &execution_id
163 ).await {
164 Ok(result) => {
165 if let Err(e) = self.store_in_cache(query, session_id, &result).await {
167 warn!("Failed to store result in cache: {}", e);
168 }
169
170 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
171 let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
172 let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
173
174 let formatted_response = self.create_formatted_commodity_response(
176 query, market, content, source_used, method_used,
177 data_type, timeframe, include_ratios, include_volume, &execution_id
178 );
179
180 let tool_result = ToolResult::success_with_raw(
181 vec![McpContent::text(formatted_response)],
182 result
183 );
184
185 if self.is_data_reduction_enabled() {
187 Ok(ResponseStrategy::apply_size_limits(tool_result))
188 } else {
189 Ok(tool_result)
190 }
191 }
192 Err(_e) => {
193 warn!("BrightData error for query '{}', returning empty data for retry", query);
195 let empty_response = json!({
196 "query": query,
197 "market": market,
198 "status": "no_data",
199 "reason": "brightdata_error",
200 "execution_id": execution_id,
201 "session_id": session_id
202 });
203
204 Ok(ToolResult::success_with_raw(
205 vec![McpContent::text("๐ **No Data Available**\n\nPlease try again with a more specific commodity symbol.".to_string())],
206 empty_response
207 ))
208 }
209 }
210 }
211}
212
213impl CommodityDataTool {
214 fn is_data_reduction_enabled(&self) -> bool {
216 std::env::var("DEDUCT_DATA")
217 .unwrap_or_else(|_| "false".to_string())
218 .to_lowercase() == "true"
219 }
220
221 fn create_formatted_commodity_response(
223 &self,
224 query: &str,
225 market: &str,
226 content: &str,
227 source: &str,
228 method: &str,
229 data_type: &str,
230 timeframe: &str,
231 include_ratios: bool,
232 include_volume: bool,
233 execution_id: &str
234 ) -> String {
235 if !self.is_data_reduction_enabled() {
237 return format!(
238 "๐ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
239 query.to_uppercase(),
240 market.to_uppercase(),
241 content,
242 source,
243 method,
244 data_type,
245 timeframe
246 );
247 }
248
249 format!(
252 "๐ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
253 query.to_uppercase(),
254 market.to_uppercase(),
255 content,
256 source,
257 method,
258 data_type,
259 timeframe
260 )
261 }
262
263 fn extract_essential_commodity_data(&self, content: &str, query: &str) -> String {
265 content.to_string()
268 }
269
270 fn extract_financial_lines(&self, content: &str) -> String {
272 content.to_string()
275 }
276
277 fn format_financial_metrics(&self, data: &str) -> String {
279 data.to_string()
282 }
283
284 async fn check_cache_first(
286 &self,
287 query: &str,
288 session_id: &str,
289 ) -> Result<Option<Value>, BrightDataError> {
290 let cache_service = get_commodity_cache().await?;
291 cache_service.get_cached_commodity_data(session_id, query).await
292 }
293
294 async fn store_in_cache(
296 &self,
297 query: &str,
298 session_id: &str,
299 data: &Value,
300 ) -> Result<(), BrightDataError> {
301 let cache_service = get_commodity_cache().await?;
302 cache_service.cache_commodity_data(session_id, query, data.clone()).await
303 }
304
305 pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
307 let cache_service = get_commodity_cache().await?;
308 cache_service.get_session_commodity_symbols(session_id).await
309 }
310
311 pub async fn clear_symbol_cache(
313 &self,
314 symbol: &str,
315 session_id: &str,
316 ) -> Result<(), BrightDataError> {
317 let cache_service = get_commodity_cache().await?;
318 cache_service.clear_commodity_symbol_cache(session_id, symbol).await
319 }
320
321 pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
323 let cache_service = get_commodity_cache().await?;
324 cache_service.clear_session_commodity_cache(session_id).await
325 }
326
327 pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
329 let cache_service = get_commodity_cache().await?;
330 cache_service.get_commodity_cache_stats().await
331 }
332
333 pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
335 let mut results = Vec::new();
336
337 info!("๐งช Testing Redis Cache...");
339 match get_commodity_cache().await {
340 Ok(cache_service) => {
341 match cache_service.health_check().await {
342 Ok(_) => results.push("โ
Redis Cache: SUCCESS".to_string()),
343 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
344 }
345 }
346 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
347 }
348
349 let api_test = self.test_connectivity().await?;
351 results.push(api_test);
352
353 Ok(format!("๐ Enhanced Connectivity Test Results:\n{}", results.join("\n")))
354 }
355
356 fn build_prioritized_urls_with_priority(
358 &self,
359 query: &str,
360 market: &str,
361 data_type: &str,
362 priority: crate::filters::strategy::QueryPriority
363 ) -> MethodUrls {
364 let mut proxy_urls = Vec::new();
365 let mut direct_urls = Vec::new();
366 let clean_query = query.trim().to_uppercase();
367
368 let max_sources = 3;
370
371 if self.is_likely_commodity_symbol(&clean_query) {
372 match market {
373 "usd" => {
374 let symbols_to_try = vec![
375 format!("MCX-{}1!/technicals", clean_query),
376 clean_query.clone(),
377 ];
378
379 for (i, symbol) in symbols_to_try.iter().enumerate() {
380 if i >= max_sources { break; }
381
382 let url = format!("https://in.tradingview.com/symbols/{}", symbol);
383 let description = format!("Yahoo Finance ({})", symbol);
384
385 let proxy_url = format!("https://in.tradingview.com/symbols/{}/", symbol);
386 let proxy_description = format!("Yahoo Finance ({})", symbol);
387
388 proxy_urls.push((proxy_url, proxy_description));
390 direct_urls.push((url, description));
391 }
392 },
393 "inr" => {
394 let symbols_to_try = vec![
395 format!("MCX-{}1!/technicals", clean_query),
396 clean_query.clone(),
397 ];
398
399 for (i, symbol) in symbols_to_try.iter().enumerate() {
400 if i >= max_sources { break; }
401
402 let url = format!("https://in.tradingview.com/symbols/{}", symbol);
403 let description = format!("Yahoo Finance ({})", symbol);
404
405 let proxy_url = format!("https://in.tradingview.com/symbols/{}/", symbol);
406 let proxy_description = format!("Yahoo Finance ({})", symbol);
407
408 proxy_urls.push((proxy_url, proxy_description));
410 direct_urls.push((url, description));
411 }
412 }
413
414 _ => {
415 let symbols_to_try = vec![
416 format!("MCX-{}1!/technicals", clean_query),
417 clean_query.clone(),
418 ];
419
420 for (i, symbol) in symbols_to_try.iter().enumerate() {
421 if i >= max_sources { break; }
422
423 let url = format!("https://in.tradingview.com/symbols/{}", symbol);
424 let description = format!("Yahoo Finance ({})", symbol);
425
426 let proxy_url = format!("https://in.tradingview.com/symbols/{}/", symbol);
427 let proxy_description = format!("Yahoo Finance ({})", symbol);
428
429 proxy_urls.push((proxy_url, proxy_description));
431 direct_urls.push((url, description));
432 }
433 }
434
435 }
436 }
437
438 if proxy_urls.len() < max_sources {
440 let url = format!("https://in.tradingview.com/symbols/{}1!/technicals", urlencoding::encode(query));
441 let description = "Yahoo Finance Search".to_string();
442
443 let proxy_url = format!("https://in.tradingview.com/symbols/{}1!/technicals", urlencoding::encode(query));
444 let proxy_description = "Yahoo Finance Search".to_string();
445
446 proxy_urls.push((proxy_url, proxy_description));
447 direct_urls.push((url, description));
448 }
449
450 info!("๐ฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})",
451 proxy_urls.len(), direct_urls.len(), query, priority);
452
453 MethodUrls {
454 proxy: proxy_urls,
455 direct: direct_urls,
456 }
457 }
458
459 async fn fetch_commodity_data_with_fallbacks_and_priority(
461 &self,
462 query: &str,
463 market: &str,
464 data_type: &str,
465 timeframe: &str,
466 include_ratios: bool,
467 include_volume: bool,
468 query_priority: crate::filters::strategy::QueryPriority,
469 token_budget: usize,
470 execution_id: &str
471 ) -> Result<Value, BrightDataError> {
472 let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
473 let mut last_error = None;
474 let mut attempts = Vec::new();
475
476 let methods_to_try = vec![
478 ("proxy", "Proxy Fallback", &method_urls.proxy)
480 ];
481
482 for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
483 info!("๐ Trying {} method with {} URLs", method_name, urls_for_method.len());
484
485 for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
486 let attempt_result = match *method_type {
487 "direct" => {
488 info!("๐ Trying Direct BrightData API for {} (method: {}, url: {}/{})",
489 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
490 self.try_fetch_url_direct_api(
491 url, query, market, source_name, query_priority, token_budget,
492 execution_id, url_sequence as u64, method_sequence as u64
493 ).await
494 }
495 "proxy" => {
496 info!("๐ Trying Proxy method for {} (method: {}, url: {}/{})",
497 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
498 self.try_fetch_url_via_proxy(
499 url, query, market, source_name, query_priority, token_budget,
500 execution_id, url_sequence as u64, method_sequence as u64
501 ).await
502 }
503 _ => continue,
504 };
505
506 match attempt_result {
507 Ok(mut result) => {
508 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
509
510 attempts.push(json!({
511 "source": source_name,
512 "url": url,
513 "method": method_name,
514 "status": "success",
515 "content_length": content.len(),
516 "method_sequence": method_sequence + 1,
517 "url_sequence": url_sequence + 1
518 }));
519
520 let should_try_next = if self.is_data_reduction_enabled() {
522 false
524 } else {
525 false
526 };
527
528 if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
529 if url_sequence < urls_for_method.len() - 1 {
530 warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
531 continue; } else {
533 warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
534 break; }
536 }
537
538 if self.is_data_reduction_enabled() {
540 }
543
544 result["source_used"] = json!(source_name);
546 result["url_used"] = json!(url);
547 result["method_used"] = json!(method_name);
548 result["execution_id"] = json!(execution_id);
549 result["priority"] = json!(format!("{:?}", query_priority));
550 result["token_budget"] = json!(token_budget);
551 result["attempts"] = json!(attempts);
552 result["successful_method_sequence"] = json!(method_sequence + 1);
553 result["successful_url_sequence"] = json!(url_sequence + 1);
554
555 info!("โ
Successfully fetched commodity data from {} via {} (method: {}, url: {})",
556 source_name, method_name, method_sequence + 1, url_sequence + 1);
557
558 return Ok(result);
559 }
560 Err(e) => {
561 attempts.push(json!({
562 "source": source_name,
563 "url": url,
564 "method": method_name,
565 "status": "failed",
566 "error": e.to_string(),
567 "method_sequence": method_sequence + 1,
568 "url_sequence": url_sequence + 1
569 }));
570
571 last_error = Some(e);
572 warn!("โ Failed to fetch from {} via {} (method: {}, url: {}): {:?}",
573 source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
574 }
575 }
576 }
577 }
578
579 warn!("โ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
581
582 let empty_result = json!({
583 "query": query,
584 "market": market,
585 "status": "no_data_found",
586 "attempts": attempts,
587 "execution_id": execution_id,
588 "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
589 "reason": "all_sources_failed"
590 });
591
592 Ok(empty_result)
593 }
594
595 async fn try_fetch_url_direct_api(
597 &self,
598 url: &str,
599 query: &str,
600 market: &str,
601 source_name: &str,
602 priority: crate::filters::strategy::QueryPriority,
603 token_budget: usize,
604 execution_id: &str,
605 sequence: u64,
606 method_sequence: u64
607 ) -> Result<Value, BrightDataError> {
608 let max_retries = env::var("MAX_RETRIES")
609 .ok()
610 .and_then(|s| s.parse::<u32>().ok())
611 .unwrap_or(1);
612
613 let mut last_error = None;
614
615 for retry_attempt in 0..max_retries {
616 let start_time = Instant::now();
617 let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
618
619 info!("๐ Direct API: Fetching from {} (execution: {}, retry: {}/{})",
620 source_name, attempt_id, retry_attempt + 1, max_retries);
621
622 let api_token = env::var("BRIGHTDATA_API_TOKEN")
623 .or_else(|_| env::var("API_TOKEN"))
624 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
625
626 let base_url = env::var("BRIGHTDATA_BASE_URL")
627 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
628
629 let zone = env::var("WEB_UNLOCKER_ZONE")
630 .unwrap_or_else(|_| "mcp_unlocker".to_string());
631
632 let payload = json!({
633 "url": url,
634 "zone": zone,
635 "format": "raw",
636 });
638
639 if retry_attempt == 0 {
640 info!("๐ค Direct API Request:");
641 info!(" Endpoint: {}/request", base_url);
642 info!(" Zone: {}", zone);
643 info!(" Target: {}", url);
644 }
645
646 let client = Client::builder()
647 .timeout(Duration::from_secs(90))
648 .build()
649 .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
650
651 let response = client
652 .post(&format!("{}/request", base_url))
653 .header("Authorization", format!("Bearer {}", api_token))
654 .header("Content-Type", "application/json")
655 .json(&payload)
656 .send()
657 .await
658 .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
659
660 let duration = start_time.elapsed();
661 let status = response.status().as_u16();
662 let response_headers: HashMap<String, String> = response
663 .headers()
664 .iter()
665 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
666 .collect();
667
668 info!("๐ฅ Direct API Response (retry {}):", retry_attempt + 1);
669 info!(" Status: {}", status);
670 info!(" Duration: {}ms", duration.as_millis());
671
672 let response_text = response.text().await
673 .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
674
675 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
677 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
678 warn!("โณ Direct API: Server error {}, waiting {} ms before retry...", status, wait_time.as_millis());
679 tokio::time::sleep(wait_time).await;
680 last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
681 continue;
682 }
683
684 if !(200..300).contains(&status) {
685 let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status,
686 &response_text[..response_text.len().min(500)]);
687 last_error = Some(BrightDataError::ToolError(error_msg));
688 if retry_attempt == max_retries - 1 {
689 return Err(last_error.unwrap());
690 }
691 continue;
692 }
693
694 let raw_content = response_text;
696 let filtered_content = if self.is_data_reduction_enabled() {
697 raw_content.clone()
699 } else {
700 raw_content.clone()
701 };
702
703 info!("๐ Direct API: Content processed: {} bytes -> {} bytes",
704 raw_content.len(), filtered_content.len());
705
706 if let Err(e) = BRIGHTDATA_METRICS.log_call(
708 &attempt_id,
709 url,
710 &zone,
711 "raw",
712 None,
713 payload.clone(),
714 status,
715 response_headers.clone(),
716 &raw_content,
717 Some(&filtered_content),
718 duration.as_millis() as u64,
719 None,
720 None,
721 ).await {
722 warn!("Failed to log direct API metrics: {}", e);
723 }
724
725 return Ok(json!({
726 "content": filtered_content,
727 "raw_content": raw_content,
728 "query": query,
729 "market": market,
730 "source": source_name,
731 "method": "Direct BrightData API",
732 "priority": format!("{:?}", priority),
733 "token_budget": token_budget,
734 "execution_id": execution_id,
735 "sequence": sequence,
736 "method_sequence": method_sequence,
737 "success": true,
738 "url": url,
739 "zone": zone,
740 "format": "raw",
741 "status_code": status,
742 "response_size_bytes": raw_content.len(),
743 "filtered_size_bytes": filtered_content.len(),
744 "duration_ms": duration.as_millis(),
745 "timestamp": chrono::Utc::now().to_rfc3339(),
746 "retry_attempts": retry_attempt + 1,
747 "max_retries": max_retries,
748 "payload_used": payload
749 }));
750 }
751
752 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
753 }
754
755 async fn try_fetch_url_via_proxy(
757 &self,
758 url: &str,
759 query: &str,
760 market: &str,
761 source_name: &str,
762 priority: crate::filters::strategy::QueryPriority,
763 token_budget: usize,
764 execution_id: &str,
765 sequence: u64,
766 method_sequence: u64
767 ) -> Result<Value, BrightDataError> {
768 let max_retries = env::var("MAX_RETRIES")
769 .ok()
770 .and_then(|s| s.parse::<u32>().ok())
771 .unwrap_or(1);
772
773 let mut last_error = None;
774
775 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
777 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
778 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
779 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
780 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
781 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
782 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
783 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
784
785 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
786
787 for retry_attempt in 0..max_retries {
788 let start_time = Instant::now();
789 let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
790
791 info!("๐ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})",
792 source_name, attempt_id, retry_attempt + 1, max_retries);
793
794 if retry_attempt == 0 {
795 info!("๐ค Proxy Request:");
796 info!(" Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
797 info!(" Target: {}", url);
798 }
799
800 let proxy = reqwest::Proxy::all(&proxy_url)
802 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
803
804 let client = Client::builder()
805 .proxy(proxy)
806 .timeout(Duration::from_secs(90))
807 .danger_accept_invalid_certs(true) .build()
809 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
810
811 let response = client
812 .get(url)
813 .header("x-unblock-data-format", "markdown")
814 .send()
815 .await
816 .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
817
818 let duration = start_time.elapsed();
819 let status = response.status().as_u16();
820 let response_headers: HashMap<String, String> = response
821 .headers()
822 .iter()
823 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
824 .collect();
825
826 info!("๐ฅ Proxy Response (retry {}):", retry_attempt + 1);
827 info!(" Status: {}", status);
828 info!(" Duration: {}ms", duration.as_millis());
829
830 let response_text = response.text().await
831 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
832
833 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
835 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
836 warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
837 tokio::time::sleep(wait_time).await;
838 last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
839 continue;
840 }
841
842 if !(200..300).contains(&status) {
843 println!("-----------------------------------------------------------------");
844 println!("MARKDOWN SUCCESS: {:?}", status.clone());
845 println!("-----------------------------------------------------------------");
846 let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status,
847 &response_text[..response_text.len().min(200)]);
848
849 warn!("Proxy HTTP error: {}", error_msg);
850 last_error = Some(BrightDataError::ToolError(error_msg));
851
852 let proxy_payload = json!({
854 "url": url,
855 "method": "proxy",
856 "proxy_host": proxy_host,
857 "proxy_port": proxy_port,
858 "error": format!("HTTP {}", status)
859 });
860
861 if let Err(e) = BRIGHTDATA_METRICS.log_call(
862 &attempt_id,
863 url,
864 "proxy",
865 "raw",
866 None,
867 proxy_payload,
868 status,
869 response_headers.clone(),
870 &response_text,
871 Some(&format!("Proxy HTTP {} Error", status)),
872 duration.as_millis() as u64,
873 None,
874 None,
875 ).await {
876 warn!("Failed to log proxy error metrics: {}", e);
877 }
878
879 if retry_attempt == max_retries - 1 {
880 return Err(last_error.unwrap());
881 }
882 continue;
883 }
884
885 let raw_content = response_text;
887 let filtered_content = if self.is_data_reduction_enabled() {
888 raw_content.clone()
890 } else {
891 raw_content.clone()
892 };
893
894 info!("๐ Proxy: Content processed: {} bytes -> {} bytes",
895 raw_content.len(), filtered_content.len());
896
897 let proxy_payload = json!({
899 "url": url,
900 "method": "proxy",
901 "proxy_host": proxy_host,
902 "proxy_port": proxy_port
903 });
904
905 if let Err(e) = BRIGHTDATA_METRICS.log_call(
906 &attempt_id,
907 url,
908 "proxy",
909 "raw",
910 None,
911 proxy_payload.clone(),
912 status,
913 response_headers.clone(),
914 &raw_content,
915 Some(&filtered_content),
916 duration.as_millis() as u64,
917 None,
918 None,
919 ).await {
920 warn!("Failed to log proxy metrics: {}", e);
921 }
922
923 return Ok(json!({
924 "content": filtered_content,
925 "raw_content": raw_content,
926 "query": query,
927 "market": market,
928 "source": source_name,
929 "method": "BrightData Proxy",
930 "priority": format!("{:?}", priority),
931 "token_budget": token_budget,
932 "execution_id": execution_id,
933 "sequence": sequence,
934 "method_sequence": method_sequence,
935 "success": true,
936 "url": url,
937 "proxy_host": proxy_host,
938 "proxy_port": proxy_port,
939 "status_code": status,
940 "response_size_bytes": raw_content.len(),
941 "filtered_size_bytes": filtered_content.len(),
942 "duration_ms": duration.as_millis(),
943 "timestamp": chrono::Utc::now().to_rfc3339(),
944 "retry_attempts": retry_attempt + 1,
945 "max_retries": max_retries,
946 "payload_used": proxy_payload
947 }));
948 }
949
950 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
951 }
952
953 fn is_likely_commodity_symbol(&self, query: &str) -> bool {
954 let clean = query.trim();
955
956 if clean.len() < 1 || clean.len() > 15 {
957 return false;
958 }
959
960 let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
961 let has_letters = clean.chars().any(|c| c.is_alphabetic());
962
963 valid_chars && has_letters
964 }
965
966
967 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
969 let test_url = "https://in.tradingview.com/symbols/MCX-CRUDEOIL1!/technicals/";
970 let mut results = Vec::new();
971
972 info!("Testing Direct BrightData API...");
974 match self.try_fetch_url_direct_api(
975 test_url, "BTC", "usd", "Yahoo Finance Test",
976 crate::filters::strategy::QueryPriority::High, 1000,
977 "connectivity_test", 0, 0
978 ).await {
979 Ok(_) => {
980 results.push("Direct API: SUCCESS".to_string());
981 }
982 Err(e) => {
983 results.push(format!("Direct API: FAILED - {}", e));
984 }
985 }
986
987 info!("Testing Proxy method...");
989 match self.try_fetch_url_via_proxy(
990 test_url, "BTC", "usd", "Yahoo Finance Test",
991 crate::filters::strategy::QueryPriority::High, 1000,
992 "connectivity_test", 0, 1
993 ).await {
994 Ok(_) => {
995 results.push("Proxy: SUCCESS".to_string());
996 }
997 Err(e) => {
998 results.push(format!("Proxy: FAILED - {}", e));
999 }
1000 }
1001
1002 Ok(format!("Connectivity Test Results:\n{}", results.join("\n")))
1003 }
1004}