1use crate::tool::{Tool, ToolResult, McpContent};
3use crate::error::BrightDataError;
4use crate::filters::{ResponseFilter, ResponseStrategy, ResponseType};
5use crate::extras::logger::JSON_LOGGER;
6use crate::metrics::brightdata_logger::BRIGHTDATA_METRICS;
7use crate::services::cache::stock_cache::get_stock_cache;
8use async_trait::async_trait;
9use reqwest::Client;
10use serde_json::{json, Value};
11use std::env;
12use std::time::{Duration, Instant};
13use std::collections::HashMap;
14use log::{info, warn, error};
15use crate::symbols::stock_symbol::match_symbol_from_query;
16
17#[derive(Debug, Clone)]
19pub struct MethodUrls {
20 pub proxy: Vec<(String, String)>, pub direct: Vec<(String, String)>, }
23
24pub struct StockDataTool;
25
26#[async_trait]
27impl Tool for StockDataTool {
28 fn name(&self) -> &str {
29 "get_stock_data"
30 }
31
32 fn description(&self) -> &str {
33 "Get comprehensive stock data including prices, performance, market cap, volumes with intelligent filtering and priority-based processing. Supports both direct BrightData API and proxy fallback."
34 }
35
36 fn input_schema(&self) -> Value {
37 json!({
38 "type": "object",
39 "properties": {
40 "query": {
41 "type": "string",
42 "description": "Stock symbol (e.g. TATAMOTORS, TCS, AAPL), company name, comparison query, or market overview request"
43 },
44 "symbol": {
45 "type": "string",
46 "description": "Stock symbol (e.g. TATAMOTORS, TCS, AAPL), company name, comparison query, or market overview request"
47 },
48 "market": {
49 "type": "string",
50 "enum": ["indian", "us", "global"],
51 "default": "indian",
52 "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
53 },
54 "data_type": {
55 "type": "string",
56 "enum": ["price", "fundamentals", "technical", "news", "all"],
57 "default": "all",
58 "description": "Type of stock data to focus on"
59 },
60 "timeframe": {
61 "type": "string",
62 "enum": ["realtime", "day", "week", "month", "quarter", "year"],
63 "default": "realtime",
64 "description": "Time period for stock data analysis"
65 },
66 "include_ratios": {
67 "type": "boolean",
68 "default": true,
69 "description": "Include financial ratios like P/E, P/B, ROE"
70 },
71 "include_volume": {
72 "type": "boolean",
73 "default": true,
74 "description": "Include trading volume and liquidity data"
75 },
76 "session_id": {
77 "type": "string",
78 "description": "Session identifier for caching (optional, will use default if not provided)"
79 },
80 },
81 "required": ["query"]
82 })
83 }
84
85 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
86 self.execute_internal(parameters).await
87 }
88
89 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
90 let raw_query = parameters
91 .get("symbol")
92 .and_then(|v| v.as_str())
93 .ok_or_else(|| BrightDataError::ToolError("Missing 'symbol' parameter".into()))?;
94
95 let session_id = parameters
96 .get("user_id")
97 .and_then(|v| v.as_str())
98 .ok_or_else(|| BrightDataError::ToolError("Missing 'user_id' parameter".into()))?;
99
100 let matched_symbol = match_symbol_from_query(raw_query);
102
103 let query = matched_symbol.split('.').next().unwrap_or(&matched_symbol);
105
106 let market = parameters
107 .get("market")
108 .and_then(|v| v.as_str())
109 .unwrap_or("indian");
110
111 let data_type = parameters
112 .get("data_type")
113 .and_then(|v| v.as_str())
114 .unwrap_or("all");
115
116 let timeframe = parameters
117 .get("timeframe")
118 .and_then(|v| v.as_str())
119 .unwrap_or("realtime");
120
121 let include_ratios = parameters
122 .get("include_ratios")
123 .and_then(|v| v.as_bool())
124 .unwrap_or(true);
125
126 let include_volume = parameters
127 .get("include_volume")
128 .and_then(|v| v.as_bool())
129 .unwrap_or(true);
130
131 let query_priority = ResponseStrategy::classify_query_priority(query);
132 let recommended_tokens = ResponseStrategy::get_recommended_token_allocation(query);
133
134 let execution_id = format!("stock_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f"));
135
136 info!("๐ Stock query: '{}' (market: {}, priority: {:?}, tokens: {}, session: {})",
137 query, market, query_priority, recommended_tokens, session_id);
138
139 match self.check_cache_first(query, session_id).await {
141 Ok(Some(cached_result)) => {
142 info!("๐ Cache HIT: Returning cached data for {} in session {}", query, session_id);
143
144 let content = cached_result.get("content").and_then(|c| c.as_str()).unwrap_or("");
146 let source_used = cached_result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Cache");
147 let method_used = "Redis Cache";
148
149 let formatted_response = self.create_formatted_stock_response(
150 query, market, content, source_used, method_used,
151 data_type, timeframe, include_ratios, include_volume, &execution_id
152 );
153
154 let tool_result = ToolResult::success_with_raw(
155 vec![McpContent::text(formatted_response)],
156 cached_result
157 );
158
159 if self.is_data_reduction_enabled() {
161 return Ok(ResponseStrategy::apply_size_limits(tool_result));
162 } else {
163 return Ok(tool_result);
164 }
165 }
166 Ok(None) => {
167 info!("๐พ Cache MISS: Fetching fresh data for {} in session {}", query, session_id);
168 }
169 Err(e) => {
170 warn!("๐จ Cache error (continuing with fresh fetch): {}", e);
171 }
172 }
173
174 match self.fetch_stock_data_with_fallbacks_and_priority(
176 query, market, data_type, timeframe, include_ratios, include_volume,
177 query_priority, recommended_tokens, &execution_id
178 ).await {
179 Ok(result) => {
180 if let Err(e) = self.store_in_cache(query, session_id, &result).await {
182 warn!("Failed to store result in cache: {}", e);
183 }
184
185 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
186 let source_used = result.get("source_used").and_then(|s| s.as_str()).unwrap_or("Unknown");
187 let method_used = result.get("method_used").and_then(|m| m.as_str()).unwrap_or("Unknown");
188
189 let formatted_response = self.create_formatted_stock_response(
191 query, market, content, source_used, method_used,
192 data_type, timeframe, include_ratios, include_volume, &execution_id
193 );
194
195 let tool_result = ToolResult::success_with_raw(
196 vec![McpContent::text(formatted_response)],
197 result
198 );
199
200 if self.is_data_reduction_enabled() {
202 Ok(ResponseStrategy::apply_size_limits(tool_result))
203 } else {
204 Ok(tool_result)
205 }
206 }
207 Err(_e) => {
208 warn!("BrightData error for query '{}', returning empty data for retry", query);
210 let empty_response = json!({
211 "query": query,
212 "market": market,
213 "status": "no_data",
214 "reason": "brightdata_error",
215 "execution_id": execution_id,
216 "session_id": session_id
217 });
218
219 Ok(ToolResult::success_with_raw(
220 vec![McpContent::text("๐ **No Data Available**\n\nPlease try again with a more specific stock symbol.".to_string())],
221 empty_response
222 ))
223 }
224 }
225 }
226}
227
228impl StockDataTool {
229 fn is_data_reduction_enabled(&self) -> bool {
231 std::env::var("DEDUCT_DATA")
232 .unwrap_or_else(|_| "false".to_string())
233 .to_lowercase() == "true"
234 }
235
236 fn create_formatted_stock_response(
238 &self,
239 query: &str,
240 market: &str,
241 content: &str,
242 source: &str,
243 method: &str,
244 data_type: &str,
245 timeframe: &str,
246 include_ratios: bool,
247 include_volume: bool,
248 execution_id: &str
249 ) -> String {
250 if !self.is_data_reduction_enabled() {
252 return format!(
253 "๐ **{}** | {} Market\n\n## Full Content\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
254 query.to_uppercase(),
255 market.to_uppercase(),
256 content,
257 source,
258 method,
259 data_type,
260 timeframe
261 );
262 }
263
264 format!(
267 "๐ **{}** | {} Market\n\n## Content (TODO: Add Filtering)\n{}\n\n*Source: {} via {} โข Type: {} โข Period: {}*",
268 query.to_uppercase(),
269 market.to_uppercase(),
270 content,
271 source,
272 method,
273 data_type,
274 timeframe
275 )
276 }
277
278 fn extract_essential_stock_data(&self, content: &str, query: &str) -> String {
280 content.to_string()
283 }
284
285 fn extract_financial_lines(&self, content: &str) -> String {
287 content.to_string()
290 }
291
292 fn format_financial_metrics(&self, data: &str) -> String {
294 data.to_string()
297 }
298
299 async fn check_cache_first(
301 &self,
302 query: &str,
303 session_id: &str,
304 ) -> Result<Option<Value>, BrightDataError> {
305 let cache_service = get_stock_cache().await?;
306 cache_service.get_cached_stock_data(session_id, query).await
307 }
308
309 async fn store_in_cache(
311 &self,
312 query: &str,
313 session_id: &str,
314 data: &Value,
315 ) -> Result<(), BrightDataError> {
316 let cache_service = get_stock_cache().await?;
317 cache_service.cache_stock_data(session_id, query, data.clone()).await
318 }
319
320 pub async fn get_session_cached_symbols(&self, session_id: &str) -> Result<Vec<String>, BrightDataError> {
322 let cache_service = get_stock_cache().await?;
323 cache_service.get_session_stock_symbols(session_id).await
324 }
325
326 pub async fn clear_symbol_cache(
328 &self,
329 symbol: &str,
330 session_id: &str,
331 ) -> Result<(), BrightDataError> {
332 let cache_service = get_stock_cache().await?;
333 cache_service.clear_stock_symbol_cache(session_id, symbol).await
334 }
335
336 pub async fn clear_session_cache(&self, session_id: &str) -> Result<u32, BrightDataError> {
338 let cache_service = get_stock_cache().await?;
339 cache_service.clear_session_stock_cache(session_id).await
340 }
341
342 pub async fn get_cache_stats(&self) -> Result<Value, BrightDataError> {
344 let cache_service = get_stock_cache().await?;
345 cache_service.get_stock_cache_stats().await
346 }
347
348 pub async fn test_connectivity_with_cache(&self) -> Result<String, BrightDataError> {
350 let mut results = Vec::new();
351
352 info!("๐งช Testing Redis Cache...");
354 match get_stock_cache().await {
355 Ok(cache_service) => {
356 match cache_service.health_check().await {
357 Ok(_) => results.push("โ
Redis Cache: SUCCESS".to_string()),
358 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
359 }
360 }
361 Err(e) => results.push(format!("โ Redis Cache: FAILED - {}", e)),
362 }
363
364 let api_test = self.test_connectivity().await?;
366 results.push(api_test);
367
368 Ok(format!("๐ Enhanced Connectivity Test Results:\n{}", results.join("\n")))
369 }
370
371 fn build_prioritized_urls_with_priority(
373 &self,
374 query: &str,
375 market: &str,
376 data_type: &str,
377 priority: crate::filters::strategy::QueryPriority
378 ) -> MethodUrls {
379 let mut proxy_urls = Vec::new();
380 let mut direct_urls = Vec::new();
381 let clean_query = query.trim().to_uppercase();
382
383 let max_sources = 3;
385
386 if self.is_likely_stock_symbol(&clean_query) {
387 match market {
388 "indian" => {
389 let symbols_to_try = vec![
391 format!("{}.NS", clean_query),
392 format!("{}.BO", clean_query),
393 clean_query.clone(),
394 ];
395
396 for (i, symbol) in symbols_to_try.iter().enumerate() {
397 if i >= max_sources { break; }
398
399 let url = format!("https://finance.yahoo.com/quote/{}", symbol);
400 let description = format!("Yahoo Finance ({})", symbol);
401
402 let proxy_url = format!("https://finance.yahoo.com/quote/{}/", symbol);
403 let proxy_description = format!("Yahoo Finance ({})", symbol);
404
405 proxy_urls.push((proxy_url, proxy_description));
407 direct_urls.push((url, description));
408 }
409 }
410 "us" => {
411 let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
412 let description = format!("Yahoo Finance ({})", clean_query);
413
414 let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
415 let proxy_description = format!("Yahoo Finance ({})", clean_query);
416
417 proxy_urls.push((proxy_url, proxy_description));
418 direct_urls.push((url, description));
419 }
420 "global" => {
421 let url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
422 let description = format!("Yahoo Finance Global ({})", clean_query);
423
424 let proxy_url = format!("https://finance.yahoo.com/quote/{}.NS/", clean_query);
425 let proxy_description = format!("Yahoo Finance Global ({})", clean_query);
426
427 proxy_urls.push((proxy_url, proxy_description));
428 direct_urls.push((url, description));
429 }
430 _ => {}
431 }
432 }
433
434 if proxy_urls.len() < max_sources {
436 let url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
437 let description = "Yahoo Finance Search".to_string();
438
439 let proxy_url = format!("https://finance.yahoo.com/quote/{}", urlencoding::encode(query));
440 let proxy_description = "Yahoo Finance Search".to_string();
441
442 proxy_urls.push((proxy_url, proxy_description));
443 direct_urls.push((url, description));
444 }
445
446 info!("๐ฏ Generated {} proxy URLs and {} direct URLs for query '{}' (priority: {:?})",
447 proxy_urls.len(), direct_urls.len(), query, priority);
448
449 MethodUrls {
450 proxy: proxy_urls,
451 direct: direct_urls,
452 }
453 }
454
455 async fn fetch_stock_data_with_fallbacks_and_priority(
457 &self,
458 query: &str,
459 market: &str,
460 data_type: &str,
461 timeframe: &str,
462 include_ratios: bool,
463 include_volume: bool,
464 query_priority: crate::filters::strategy::QueryPriority,
465 token_budget: usize,
466 execution_id: &str
467 ) -> Result<Value, BrightDataError> {
468 let method_urls = self.build_prioritized_urls_with_priority(query, market, data_type, query_priority);
469 let mut last_error = None;
470 let mut attempts = Vec::new();
471
472 let methods_to_try = vec![
474 ("proxy", "Proxy Fallback", &method_urls.proxy)
476 ];
477
478 for (method_sequence, (method_type, method_name, urls_for_method)) in methods_to_try.iter().enumerate() {
479 info!("๐ Trying {} method with {} URLs", method_name, urls_for_method.len());
480
481 for (url_sequence, (url, source_name)) in urls_for_method.iter().enumerate() {
482 let attempt_result = match *method_type {
483 "direct" => {
484 info!("๐ Trying Direct BrightData API for {} (method: {}, url: {}/{})",
485 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
486 self.try_fetch_url_direct_api(
487 url, query, market, source_name, query_priority, token_budget,
488 execution_id, url_sequence as u64, method_sequence as u64
489 ).await
490 }
491 "proxy" => {
492 info!("๐ Trying Proxy method for {} (method: {}, url: {}/{})",
493 source_name, method_sequence + 1, url_sequence + 1, urls_for_method.len());
494 self.try_fetch_url_via_proxy(
495 url, query, market, source_name, query_priority, token_budget,
496 execution_id, url_sequence as u64, method_sequence as u64
497 ).await
498 }
499 _ => continue,
500 };
501
502 match attempt_result {
503 Ok(mut result) => {
504 let content = result.get("content").and_then(|c| c.as_str()).unwrap_or("");
505
506 attempts.push(json!({
507 "source": source_name,
508 "url": url,
509 "method": method_name,
510 "status": "success",
511 "content_length": content.len(),
512 "method_sequence": method_sequence + 1,
513 "url_sequence": url_sequence + 1
514 }));
515
516 let should_try_next = if self.is_data_reduction_enabled() {
518 false
520 } else {
521 false
522 };
523
524 if should_try_next && (url_sequence < urls_for_method.len() - 1 || method_sequence < methods_to_try.len() - 1) {
525 if url_sequence < urls_for_method.len() - 1 {
526 warn!("Content insufficient from {} via {}, trying next URL in same method", source_name, method_name);
527 continue; } else {
529 warn!("Content insufficient from {} via {}, trying next method", source_name, method_name);
530 break; }
532 }
533
534 if self.is_data_reduction_enabled() {
536 }
539
540 result["source_used"] = json!(source_name);
542 result["url_used"] = json!(url);
543 result["method_used"] = json!(method_name);
544 result["execution_id"] = json!(execution_id);
545 result["priority"] = json!(format!("{:?}", query_priority));
546 result["token_budget"] = json!(token_budget);
547 result["attempts"] = json!(attempts);
548 result["successful_method_sequence"] = json!(method_sequence + 1);
549 result["successful_url_sequence"] = json!(url_sequence + 1);
550
551 info!("โ
Successfully fetched stock data from {} via {} (method: {}, url: {})",
552 source_name, method_name, method_sequence + 1, url_sequence + 1);
553
554 return Ok(result);
555 }
556 Err(e) => {
557 attempts.push(json!({
558 "source": source_name,
559 "url": url,
560 "method": method_name,
561 "status": "failed",
562 "error": e.to_string(),
563 "method_sequence": method_sequence + 1,
564 "url_sequence": url_sequence + 1
565 }));
566
567 last_error = Some(e);
568 warn!("โ Failed to fetch from {} via {} (method: {}, url: {}): {:?}",
569 source_name, method_name, method_sequence + 1, url_sequence + 1, last_error);
570 }
571 }
572 }
573 }
574
575 warn!("โ All sources and methods failed for query '{}'. Returning empty data for Anthropic retry", query);
577
578 let empty_result = json!({
579 "query": query,
580 "market": market,
581 "status": "no_data_found",
582 "attempts": attempts,
583 "execution_id": execution_id,
584 "total_attempts": method_urls.direct.len() + method_urls.proxy.len(),
585 "reason": "all_sources_failed"
586 });
587
588 Ok(empty_result)
589 }
590
591 async fn try_fetch_url_direct_api(
593 &self,
594 url: &str,
595 query: &str,
596 market: &str,
597 source_name: &str,
598 priority: crate::filters::strategy::QueryPriority,
599 token_budget: usize,
600 execution_id: &str,
601 sequence: u64,
602 method_sequence: u64
603 ) -> Result<Value, BrightDataError> {
604 let max_retries = env::var("MAX_RETRIES")
605 .ok()
606 .and_then(|s| s.parse::<u32>().ok())
607 .unwrap_or(1);
608
609 let mut last_error = None;
610
611 for retry_attempt in 0..max_retries {
612 let start_time = Instant::now();
613 let attempt_id = format!("{}_direct_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
614
615 info!("๐ Direct API: Fetching from {} (execution: {}, retry: {}/{})",
616 source_name, attempt_id, retry_attempt + 1, max_retries);
617
618 let api_token = env::var("BRIGHTDATA_API_TOKEN")
619 .or_else(|_| env::var("API_TOKEN"))
620 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_API_TOKEN environment variable".into()))?;
621
622 let base_url = env::var("BRIGHTDATA_BASE_URL")
623 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
624
625 let zone = env::var("WEB_UNLOCKER_ZONE")
626 .unwrap_or_else(|_| "mcp_unlocker".to_string());
627
628 let payload = json!({
629 "url": url,
630 "zone": zone,
631 "format": "raw",
632 });
634
635 if retry_attempt == 0 {
636 info!("๐ค Direct API Request:");
637 info!(" Endpoint: {}/request", base_url);
638 info!(" Zone: {}", zone);
639 info!(" Target: {}", url);
640 }
641
642 let client = Client::builder()
643 .timeout(Duration::from_secs(90))
644 .build()
645 .map_err(|e| BrightDataError::ToolError(format!("Failed to create HTTP client: {}", e)))?;
646
647 let response = client
648 .post(&format!("{}/request", base_url))
649 .header("Authorization", format!("Bearer {}", api_token))
650 .header("Content-Type", "application/json")
651 .json(&payload)
652 .send()
653 .await
654 .map_err(|e| BrightDataError::ToolError(format!("Direct API request failed to {}: {}", source_name, e)))?;
655
656 let duration = start_time.elapsed();
657 let status = response.status().as_u16();
658 let response_headers: HashMap<String, String> = response
659 .headers()
660 .iter()
661 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
662 .collect();
663
664 info!("๐ฅ Direct API Response (retry {}):", retry_attempt + 1);
665 info!(" Status: {}", status);
666 info!(" Duration: {}ms", duration.as_millis());
667
668 let response_text = response.text().await
669 .map_err(|e| BrightDataError::ToolError(format!("Failed to read response body from {}: {}", source_name, e)))?;
670
671 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
673 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
674 warn!("โณ Direct API: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
675 tokio::time::sleep(wait_time).await;
676 last_error = Some(BrightDataError::ToolError(format!("Direct API server error: {}", status)));
677 continue;
678 }
679
680 if !(200..300).contains(&status) {
681 let error_msg = format!("Direct API: {} returned HTTP {}: {}", source_name, status,
682 &response_text[..response_text.len().min(500)]);
683 last_error = Some(BrightDataError::ToolError(error_msg));
684 if retry_attempt == max_retries - 1 {
685 return Err(last_error.unwrap());
686 }
687 continue;
688 }
689
690 let raw_content = response_text;
692 let filtered_content = if self.is_data_reduction_enabled() {
693 raw_content.clone()
695 } else {
696 raw_content.clone()
697 };
698
699 info!("๐ Direct API: Content processed: {} bytes -> {} bytes",
700 raw_content.len(), filtered_content.len());
701
702 if let Err(e) = BRIGHTDATA_METRICS.log_call(
704 &attempt_id,
705 url,
706 &zone,
707 "raw",
708 None,
709 payload.clone(),
710 status,
711 response_headers.clone(),
712 &raw_content,
713 Some(&filtered_content),
714 duration.as_millis() as u64,
715 None,
716 None,
717 ).await {
718 warn!("Failed to log direct API metrics: {}", e);
719 }
720
721 return Ok(json!({
722 "content": filtered_content,
723 "raw_content": raw_content,
724 "query": query,
725 "market": market,
726 "source": source_name,
727 "method": "Direct BrightData API",
728 "priority": format!("{:?}", priority),
729 "token_budget": token_budget,
730 "execution_id": execution_id,
731 "sequence": sequence,
732 "method_sequence": method_sequence,
733 "success": true,
734 "url": url,
735 "zone": zone,
736 "format": "raw",
737 "status_code": status,
738 "response_size_bytes": raw_content.len(),
739 "filtered_size_bytes": filtered_content.len(),
740 "duration_ms": duration.as_millis(),
741 "timestamp": chrono::Utc::now().to_rfc3339(),
742 "retry_attempts": retry_attempt + 1,
743 "max_retries": max_retries,
744 "payload_used": payload
745 }));
746 }
747
748 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Direct API: All retry attempts failed".into())))
749 }
750
751 async fn try_fetch_url_via_proxy(
753 &self,
754 url: &str,
755 query: &str,
756 market: &str,
757 source_name: &str,
758 priority: crate::filters::strategy::QueryPriority,
759 token_budget: usize,
760 execution_id: &str,
761 sequence: u64,
762 method_sequence: u64
763 ) -> Result<Value, BrightDataError> {
764 let max_retries = env::var("MAX_RETRIES")
765 .ok()
766 .and_then(|s| s.parse::<u32>().ok())
767 .unwrap_or(1);
768
769 let mut last_error = None;
770
771 let proxy_host = env::var("BRIGHTDATA_PROXY_HOST")
773 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_HOST environment variable".into()))?;
774 let proxy_port = env::var("BRIGHTDATA_PROXY_PORT")
775 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PORT environment variable".into()))?;
776 let proxy_username = env::var("BRIGHTDATA_PROXY_USERNAME")
777 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_USERNAME environment variable".into()))?;
778 let proxy_password = env::var("BRIGHTDATA_PROXY_PASSWORD")
779 .map_err(|_| BrightDataError::ToolError("Missing BRIGHTDATA_PROXY_PASSWORD environment variable".into()))?;
780
781 let proxy_url = format!("http://{}:{}@{}:{}", proxy_username, proxy_password, proxy_host, proxy_port);
782
783 for retry_attempt in 0..max_retries {
784 let start_time = Instant::now();
785 let attempt_id = format!("{}_proxy_s{}_m{}_r{}", execution_id, sequence, method_sequence, retry_attempt);
786
787 info!("๐ Proxy: Fetching from {} via proxy (execution: {}, retry: {}/{})",
788 source_name, attempt_id, retry_attempt + 1, max_retries);
789
790 if retry_attempt == 0 {
791 info!("๐ค Proxy Request:");
792 info!(" Proxy: {}:{}@{}:{}", proxy_username, "***", proxy_host, proxy_port);
793 info!(" Target: {}", url);
794 }
795
796 let proxy = reqwest::Proxy::all(&proxy_url)
798 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy: {}", e)))?;
799
800 let client = Client::builder()
801 .proxy(proxy)
802 .timeout(Duration::from_secs(90))
803 .danger_accept_invalid_certs(true) .build()
805 .map_err(|e| BrightDataError::ToolError(format!("Failed to create proxy client: {}", e)))?;
806
807 let response = client
808 .get(url)
809 .header("x-unblock-data-format", "markdown")
810 .send()
811 .await
812 .map_err(|e| BrightDataError::ToolError(format!("Proxy request failed to {}: {}", source_name, e)))?;
813
814 let duration = start_time.elapsed();
815 let status = response.status().as_u16();
816 let response_headers: HashMap<String, String> = response
817 .headers()
818 .iter()
819 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
820 .collect();
821
822 info!("๐ฅ Proxy Response (retry {}):", retry_attempt + 1);
823 info!(" Status: {}", status);
824 info!(" Duration: {}ms", duration.as_millis());
825
826 let response_text = response.text().await
827 .map_err(|e| BrightDataError::ToolError(format!("Failed to read proxy response body from {}: {}", source_name, e)))?;
828
829 if matches!(status, 502 | 503 | 504) && retry_attempt < max_retries - 1 {
831 let wait_time = Duration::from_millis(1000 + (retry_attempt as u64 * 1000));
832 warn!("โณ Proxy: Server error {}, waiting {}ms before retry...", status, wait_time.as_millis());
833 tokio::time::sleep(wait_time).await;
834 last_error = Some(BrightDataError::ToolError(format!("Proxy server error: {}", status)));
835 continue;
836 }
837
838 if !(200..300).contains(&status) {
839 println!("-----------------------------------------------------------------");
840 println!("MARKDOWN SUCCESS: {:?}", status.clone());
841 println!("-----------------------------------------------------------------");
842 let error_msg = format!("Proxy: {} returned HTTP {}: {}", source_name, status,
843 &response_text[..response_text.len().min(200)]);
844
845 warn!("Proxy HTTP error: {}", error_msg);
846 last_error = Some(BrightDataError::ToolError(error_msg));
847
848 let proxy_payload = json!({
850 "url": url,
851 "method": "proxy",
852 "proxy_host": proxy_host,
853 "proxy_port": proxy_port,
854 "error": format!("HTTP {}", status)
855 });
856
857 if let Err(e) = BRIGHTDATA_METRICS.log_call(
858 &attempt_id,
859 url,
860 "proxy",
861 "raw",
862 None,
863 proxy_payload,
864 status,
865 response_headers.clone(),
866 &response_text,
867 Some(&format!("Proxy HTTP {} Error", status)),
868 duration.as_millis() as u64,
869 None,
870 None,
871 ).await {
872 warn!("Failed to log proxy error metrics: {}", e);
873 }
874
875 if retry_attempt == max_retries - 1 {
876 return Err(last_error.unwrap());
877 }
878 continue;
879 }
880
881 let raw_content = response_text;
883 let filtered_content = if self.is_data_reduction_enabled() {
884 raw_content.clone()
886 } else {
887 raw_content.clone()
888 };
889
890 info!("๐ Proxy: Content processed: {} bytes -> {} bytes",
891 raw_content.len(), filtered_content.len());
892
893 let proxy_payload = json!({
895 "url": url,
896 "method": "proxy",
897 "proxy_host": proxy_host,
898 "proxy_port": proxy_port
899 });
900
901 if let Err(e) = BRIGHTDATA_METRICS.log_call(
902 &attempt_id,
903 url,
904 "proxy",
905 "raw",
906 None,
907 proxy_payload.clone(),
908 status,
909 response_headers.clone(),
910 &raw_content,
911 Some(&filtered_content),
912 duration.as_millis() as u64,
913 None,
914 None,
915 ).await {
916 warn!("Failed to log proxy metrics: {}", e);
917 }
918
919 return Ok(json!({
920 "content": filtered_content,
921 "raw_content": raw_content,
922 "query": query,
923 "market": market,
924 "source": source_name,
925 "method": "BrightData Proxy",
926 "priority": format!("{:?}", priority),
927 "token_budget": token_budget,
928 "execution_id": execution_id,
929 "sequence": sequence,
930 "method_sequence": method_sequence,
931 "success": true,
932 "url": url,
933 "proxy_host": proxy_host,
934 "proxy_port": proxy_port,
935 "status_code": status,
936 "response_size_bytes": raw_content.len(),
937 "filtered_size_bytes": filtered_content.len(),
938 "duration_ms": duration.as_millis(),
939 "timestamp": chrono::Utc::now().to_rfc3339(),
940 "retry_attempts": retry_attempt + 1,
941 "max_retries": max_retries,
942 "payload_used": proxy_payload
943 }));
944 }
945
946 Err(last_error.unwrap_or_else(|| BrightDataError::ToolError("Proxy: All retry attempts failed".into())))
947 }
948
949 fn is_likely_stock_symbol(&self, query: &str) -> bool {
950 let clean = query.trim();
951
952 if clean.len() < 1 || clean.len() > 15 {
953 return false;
954 }
955
956 let valid_chars = clean.chars().all(|c| c.is_alphanumeric() || c == '.');
957 let has_letters = clean.chars().any(|c| c.is_alphabetic());
958
959 valid_chars && has_letters
960 }
961
962 pub async fn test_connectivity(&self) -> Result<String, BrightDataError> {
964 let test_url = "https://finance.yahoo.com/quote/AAPL/";
965 let mut results = Vec::new();
966
967 info!("๐งช Testing Direct BrightData API...");
969 match self.try_fetch_url_direct_api(
970 test_url, "AAPL", "us", "Yahoo Finance Test",
971 crate::filters::strategy::QueryPriority::High, 1000,
972 "connectivity_test", 0, 0
973 ).await {
974 Ok(_) => {
975 results.push("โ
Direct API: SUCCESS".to_string());
976 }
977 Err(e) => {
978 results.push(format!("โ Direct API: FAILED - {}", e));
979 }
980 }
981
982 info!("๐งช Testing Proxy method...");
984 match self.try_fetch_url_via_proxy(
985 test_url, "AAPL", "us", "Yahoo Finance Test",
986 crate::filters::strategy::QueryPriority::High, 1000,
987 "connectivity_test", 0, 1
988 ).await {
989 Ok(_) => {
990 results.push("โ
Proxy: SUCCESS".to_string());
991 }
992 Err(e) => {
993 results.push(format!("โ Proxy: FAILED - {}", e));
994 }
995 }
996
997 Ok(format!("๐ Connectivity Test Results:\n{}", results.join("\n")))
998 }
999}