1use async_trait::async_trait;
3use crate::error::BrightDataError;
4use crate::extras::logger::{JSON_LOGGER, ExecutionLog};
5use crate::metrics::{BRIGHTDATA_METRICS, EnhancedLogger};
6use serde_json::Value;
7use serde::{Deserialize, Serialize};
8use log::{info, error};
9use std::time::Instant;
10use std::collections::HashMap;
11
12use std::sync::{Arc, Mutex};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16lazy_static::lazy_static! {
17 static ref MCP_SESSION_MANAGER: Arc<Mutex<McpSessionManager>> = Arc::new(Mutex::new(McpSessionManager::new()));
18}
19
20#[derive(Debug)]
21struct McpSessionManager {
22 current_session_id: Option<String>,
23 session_counter: AtomicU64,
24 session_start_time: Option<chrono::DateTime<chrono::Utc>>,
25}
26
27impl McpSessionManager {
28 fn new() -> Self {
29 Self {
30 current_session_id: None,
31 session_counter: AtomicU64::new(0),
32 session_start_time: None,
33 }
34 }
35
36 fn start_new_session(&mut self) -> String {
37 let session_count = self.session_counter.fetch_add(1, Ordering::SeqCst) + 1;
38 let session_id = format!("mcp_session_{}", session_count);
39
40 self.current_session_id = Some(session_id.clone());
41 self.session_start_time = Some(chrono::Utc::now());
42
43 info!("🎯 MCP Session {} started - resetting metrics", session_id);
44 session_id
45 }
46
47 fn get_current_session(&self) -> Option<String> {
48 self.current_session_id.clone()
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct McpContent {
55 #[serde(rename = "type")]
56 pub content_type: String,
57 pub text: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub data: Option<String>, #[serde(skip_serializing_if = "Option::is_none")]
61 pub media_type: Option<String>, }
63
64impl McpContent {
65 pub fn text(text: String) -> Self {
66 Self {
67 content_type: "text".to_string(),
68 text,
69 data: None,
70 media_type: None,
71 }
72 }
73
74 pub fn image(data: String, media_type: String) -> Self {
75 Self {
76 content_type: "image".to_string(),
77 text: "Screenshot captured".to_string(),
78 data: Some(data),
79 media_type: Some(media_type),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ToolResult {
87 pub content: Vec<McpContent>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub is_error: Option<bool>,
90 #[serde(skip_serializing_if = "Option::is_none")]
92 pub raw_value: Option<Value>,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub execution_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub session_id: Option<String>,
99}
100
101impl ToolResult {
102 pub fn success(content: Vec<McpContent>) -> Self {
103 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
104 Self {
105 content,
106 is_error: Some(false),
107 raw_value: None,
108 execution_id: None,
109 session_id,
110 }
111 }
112
113 pub fn success_with_text(text: String) -> Self {
114 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
115 Self {
116 content: vec![McpContent::text(text)],
117 is_error: Some(false),
118 raw_value: None,
119 execution_id: None,
120 session_id,
121 }
122 }
123
124 pub fn success_with_raw(content: Vec<McpContent>, raw: Value) -> Self {
125 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
126 Self {
127 content,
128 is_error: Some(false),
129 raw_value: Some(raw),
130 execution_id: None,
131 session_id,
132 }
133 }
134
135 pub fn success_with_execution_id(content: Vec<McpContent>, raw: Value, execution_id: String) -> Self {
136 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
137 Self {
138 content,
139 is_error: Some(false),
140 raw_value: Some(raw),
141 execution_id: Some(execution_id),
142 session_id,
143 }
144 }
145
146 pub fn error(message: String) -> Self {
147 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
148 Self {
149 content: vec![McpContent::text(format!("Error: {}", message))],
150 is_error: Some(true),
151 raw_value: None,
152 execution_id: None,
153 session_id,
154 }
155 }
156
157 pub fn from_legacy_value(value: Value) -> Self {
159 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
160 let text = if let Some(raw_text) = value.get("raw").and_then(|v| v.as_str()) {
161 raw_text.to_string()
162 } else {
163 serde_json::to_string_pretty(&value).unwrap_or_else(|_| value.to_string())
164 };
165
166 Self {
167 content: vec![McpContent::text(text)],
168 is_error: Some(false),
169 raw_value: Some(value),
170 execution_id: None,
171 session_id,
172 }
173 }
174}
175
176#[async_trait]
177pub trait Tool {
178 fn name(&self) -> &str;
179 fn description(&self) -> &str;
180 fn input_schema(&self) -> Value;
181
182 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
184 let start_time = Instant::now();
185
186 let current_session = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
188
189 let execution_log = JSON_LOGGER.start_execution(self.name(), parameters.clone()).await;
191 let execution_id = execution_log.execution_id.clone(); info!("🚀 Starting execution: {} (ID: {}) [Session: {:?}]",
194 self.name(), execution_id, current_session);
195
196 let result = self.execute_internal(parameters.clone()).await;
198 let duration = start_time.elapsed();
199
200 match &result {
202 Ok(tool_result) => {
203 let response_json = serde_json::to_value(tool_result).unwrap_or(serde_json::json!({}));
204
205 if let Err(e) = JSON_LOGGER.complete_execution(
207 execution_log, response_json.clone(),
209 true,
210 None,
211 ).await {
212 error!("Failed to log successful execution: {}", e);
213 }
214
215 if let Err(e) = log_tool_metrics(
217 &execution_id,
218 self.name(),
219 ¶meters,
220 tool_result,
221 duration.as_millis() as u64,
222 true,
223 None,
224 current_session.as_deref(),
225 ).await {
226 error!("Failed to log metrics: {}", e);
227 } else {
228 info!("📊 Metrics logged successfully for {} [Session: {:?}]", self.name(), current_session);
229 }
230
231 info!("✅ Execution completed successfully: {}", self.name());
232 }
233 Err(error) => {
234 let error_json = serde_json::json!({
235 "error": error.to_string(),
236 "tool": self.name()
237 });
238
239 if let Err(e) = JSON_LOGGER.complete_execution(
241 execution_log, error_json,
243 false,
244 Some(error.to_string()),
245 ).await {
246 error!("Failed to log failed execution: {}", e);
247 }
248
249 if let Err(e) = log_tool_error_metrics(
251 &format!("error_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f")),
252 self.name(),
253 ¶meters,
254 &error.to_string(),
255 duration.as_millis() as u64,
256 current_session.as_deref(),
257 ).await {
258 error!("Failed to log error metrics: {}", e);
259 }
260
261 error!("❌ Execution failed: {} - {}", self.name(), error);
262 }
263 }
264
265 result
266 }
267
268 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError>;
270
271 async fn execute_legacy(&self, parameters: Value) -> Result<Value, BrightDataError> {
273 let result = self.execute(parameters).await?;
274 if let Some(raw) = result.raw_value {
275 Ok(raw)
276 } else if !result.content.is_empty() {
277 Ok(serde_json::json!({
278 "content": result.content[0].text
279 }))
280 } else {
281 Ok(serde_json::json!({}))
282 }
283 }
284}
285
286pub fn handle_mcp_initialize() -> String {
288 let session_id = {
289 MCP_SESSION_MANAGER.lock().unwrap().start_new_session()
290 }; let session_id_clone = session_id.clone(); tokio::spawn(async move {
295 if let Err(e) = reset_metrics_for_new_session(&session_id_clone).await {
296 error!("Failed to reset metrics for new session: {}", e);
297 }
298 });
299
300 session_id }
302
303pub fn get_current_mcp_session() -> Option<String> {
304 MCP_SESSION_MANAGER.lock().unwrap().get_current_session()
305}
306
307async fn reset_metrics_for_new_session(session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
308 info!("🔄 Resetting metrics for new MCP session: {}", session_id);
309
310 BRIGHTDATA_METRICS.log_call(
312 &format!("session_start_{}", session_id),
313 &format!("mcp://session/{}", session_id),
314 "mcp_session",
315 "json",
316 Some("session_start"),
317 serde_json::json!({
318 "event": "mcp_initialize",
319 "session_id": session_id,
320 "timestamp": chrono::Utc::now().to_rfc3339()
321 }),
322 200,
323 HashMap::new(),
324 &format!("MCP session {} initialized", session_id),
325 None,
326 0,
327 None, Some(session_id), ).await?;
330
331 Ok(())
332}
333
334async fn log_tool_metrics(
336 execution_id: &str,
337 tool_name: &str,
338 parameters: &Value,
339 tool_result: &ToolResult,
340 duration_ms: u64,
341 success: bool,
342 error_message: Option<&str>,
343 session_id: Option<&str>,
344) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345
346 let (url, zone, format) = extract_brightdata_details(parameters, tool_result);
348
349 let content = if !tool_result.content.is_empty() {
351 &tool_result.content[0].text
352 } else {
353 "No content"
354 };
355
356 if let (Some(url), Some(zone), Some(format)) = (&url, &zone, &format) {
357 EnhancedLogger::log_brightdata_request_enhanced(
359 execution_id,
360 zone,
361 url,
362 parameters.clone(),
363 if success { 200 } else { 500 },
364 HashMap::new(),
365 format,
366 content,
367 None, std::time::Duration::from_millis(duration_ms),
369 session_id,
370 ).await?;
371
372 info!("📊 Logged BrightData tool {} to metrics [Session: {:?}]", tool_name, session_id);
373 } else {
374 BRIGHTDATA_METRICS.log_call(
377 execution_id,
378 &format!("tool://{}", tool_name),
379 "local_tool",
380 "json",
381 Some("tool_output"),
382 parameters.clone(),
383 if success { 200 } else { 500 },
384 HashMap::new(),
385 content,
386 None,
387 duration_ms,
388 None, session_id,
390 ).await?;
391
392 info!("📊 Logged generic tool {} to metrics [Session: {:?}]", tool_name, session_id);
393 }
394
395 Ok(())
396}
397
398async fn log_tool_error_metrics(
400 execution_id: &str,
401 tool_name: &str,
402 parameters: &Value,
403 error_message: &str,
404 duration_ms: u64,
405 session_id: Option<&str>,
406) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
407
408 BRIGHTDATA_METRICS.log_call(
410 execution_id,
411 &format!("tool://{}", tool_name),
412 "error",
413 "json",
414 Some("error"),
415 parameters.clone(),
416 500,
417 HashMap::new(),
418 &format!("Error: {}", error_message),
419 None,
420 duration_ms,
421 None, session_id,
423 ).await?;
424
425 info!("📊 Logged error metrics for {} [Session: {:?}]", tool_name, session_id);
426 Ok(())
427}
428
429fn extract_brightdata_details(parameters: &Value, tool_result: &ToolResult) -> (Option<String>, Option<String>, Option<String>) {
431 let mut url = None;
432 let mut zone = None;
433 let mut format = None;
434
435 if let Some(param_url) = parameters.get("url").and_then(|v| v.as_str()) {
437 url = Some(param_url.to_string());
438 }
439
440 if let Some(query) = parameters.get("query").and_then(|v| v.as_str()) {
442 if url.is_none() {
443 url = Some(format!("search:{}", query));
444 }
445 }
446
447 if let Some(raw_value) = &tool_result.raw_value {
449 if let Some(result_url) = raw_value.get("url").and_then(|v| v.as_str()) {
450 url = Some(result_url.to_string());
451 }
452 if let Some(result_zone) = raw_value.get("zone").and_then(|v| v.as_str()) {
453 zone = Some(result_zone.to_string());
454 }
455 if let Some(result_format) = raw_value.get("format").and_then(|v| v.as_str()) {
456 format = Some(result_format.to_string());
457 }
458 }
459
460 if zone.is_none() {
462 zone = Some(std::env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default".to_string()));
463 }
464
465 if format.is_none() {
466 format = Some("markdown".to_string());
467 }
468
469 (url, zone, format)
470}
471
472pub struct ToolResolver;
474
475impl Default for ToolResolver {
476 fn default() -> Self {
477 Self
478 }
479}
480
481impl ToolResolver {
482 pub fn resolve(&self, name: &str) -> Option<Box<dyn Tool + Send + Sync>> {
483 match name {
484 "scrape_website" => Some(Box::new(crate::tools::scrape::ScrapeMarkdown)),
486 "extract_data" => Some(Box::new(crate::tools::extract::Extractor)),
488 "get_stock_data" => Some(Box::new(crate::tools::stock::StockDataTool)),
492 "get_crypto_data" => Some(Box::new(crate::tools::crypto::CryptoDataTool)),
493 "get_etf_data" => Some(Box::new(crate::tools::etf::ETFDataTool)),
494 "get_bond_data" => Some(Box::new(crate::tools::bond::BondDataTool)),
495 "get_mutual_fund_data" => Some(Box::new(crate::tools::mutual_fund::MutualFundDataTool)),
496 "get_commodity_data" => Some(Box::new(crate::tools::commodity::CommodityDataTool)),
497
498 "multi_zone_search" => Some(Box::new(crate::tools::multi_zone_search::MultiZoneSearch)),
500
501 _ => None,
502 }
503 }
504
505 pub fn get_extract_data_tool(&self) -> Option<Box<dyn Tool + Send + Sync>> {
506 self.resolve("extract_data")
507 }
508
509 pub fn list_tools(&self) -> Vec<Value> {
510 vec![
511 serde_json::json!({
513 "name": "scrape_website",
514 "description": "Scrape a webpage and return markdown content using BrightData Web Unlocker",
515 "inputSchema": {
516 "type": "object",
517 "properties": {
518 "url": {
519 "type": "string",
520 "description": "The URL to scrape"
521 },
522 "format": {
523 "type": "string",
524 "enum": ["markdown", "raw"],
525 "description": "Output format",
526 "default": "markdown"
527 }
528 },
529 "required": ["url"]
530 }
531 }),
532 serde_json::json!({
558 "name": "extract_data",
559 "description": "Extract structured data from a webpage using AI analysis",
560 "inputSchema": {
561 "type": "object",
562 "properties": {
563 "url": {
564 "type": "string",
565 "description": "The URL to extract data from"
566 },
567 "schema": {
568 "type": "object",
569 "description": "Optional schema to guide extraction",
570 "additionalProperties": true
571 }
572 },
573 "required": ["url"]
574 }
575 }),
576 serde_json::json!({
612 "name": "get_stock_data",
613 "description": "Get comprehensive stock data including prices, performance, market cap, volumes. Use for individual stocks, stock comparisons, or stock market overviews",
614 "inputSchema": {
615 "type": "object",
616 "properties": {
617 "query": {
618 "type": "string",
619 "description": "Stock symbol (e.g. ASHOKLEY, TCS, AAPL), company name, comparison query (AAPL vs MSFT), or market overview request (today's stock performance, Nifty 50 performance)"
620 },
621 "market": {
622 "type": "string",
623 "enum": ["indian", "us", "global"],
624 "default": "indian",
625 "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
626 }
627 },
628 "required": ["query"]
629 }
630 }),
631 serde_json::json!({
632 "name": "get_crypto_data",
633 "description": "Get cryptocurrency data including prices, market cap, trading volumes. Use for individual cryptos, crypto comparisons (BTC vs ETH), or overall crypto market analysis",
634 "inputSchema": {
635 "type": "object",
636 "properties": {
637 "query": {
638 "type": "string",
639 "description": "Crypto symbol (BTC, ETH, ADA), crypto name (Bitcoin, Ethereum), comparison query (BTC vs ETH), or market overview (crypto market today, top cryptocurrencies)"
640 }
641 },
642 "required": ["query"]
643 }
644 }),
645 serde_json::json!({
646 "name": "get_etf_data",
647 "description": "Get ETF and index fund data including NAV, holdings, performance, expense ratios",
648 "inputSchema": {
649 "type": "object",
650 "properties": {
651 "query": {
652 "type": "string",
653 "description": "ETF symbol (SPY, NIFTYBEES), ETF name, or ETF market analysis query"
654 },
655 "market": {
656 "type": "string",
657 "enum": ["indian", "us", "global"],
658 "default": "indian"
659 }
660 },
661 "required": ["query"]
662 }
663 }),
664 serde_json::json!({
665 "name": "get_bond_data",
666 "description": "Get bond market data including yields, government bonds, corporate bonds, and bond market trends",
667 "inputSchema": {
668 "type": "object",
669 "properties": {
670 "query": {
671 "type": "string",
672 "description": "Bond type (government bonds, corporate bonds), yield query (10-year yield), or bond market analysis"
673 },
674 "market": {
675 "type": "string",
676 "enum": ["indian", "us", "global"],
677 "default": "indian"
678 }
679 },
680 "required": ["query"]
681 }
682 }),
683 serde_json::json!({
684 "name": "get_mutual_fund_data",
685 "description": "Get mutual fund data including NAV, performance, portfolio composition, and fund comparisons",
686 "inputSchema": {
687 "type": "object",
688 "properties": {
689 "query": {
690 "type": "string",
691 "description": "Fund name, fund symbol, fund category (equity funds, debt funds), or fund comparison query"
692 },
693 "market": {
694 "type": "string",
695 "enum": ["indian", "us", "global"],
696 "default": "indian"
697 }
698 },
699 "required": ["query"]
700 }
701 }),
702 serde_json::json!({
703 "name": "get_commodity_data",
704 "description": "Get commodity prices and market data including gold, silver, oil, agricultural commodities",
705 "inputSchema": {
706 "type": "object",
707 "properties": {
708 "query": {
709 "type": "string",
710 "description": "Commodity name (gold, silver, crude oil), commodity symbol, or commodity market overview"
711 }
712 },
713 "required": ["query"]
714 }
715 })
716 ]
738 }
739
740 pub fn get_available_tool_names(&self) -> Vec<&'static str> {
742 vec![
743 "scrape_website",
744 "extract_data",
746 "get_stock_data",
748 "get_crypto_data",
749 "get_etf_data",
750 "get_bond_data",
751 "get_mutual_fund_data",
752 "get_commodity_data",
753 ]
755 }
756
757 pub fn tool_exists(&self, name: &str) -> bool {
759 self.get_available_tool_names().contains(&name)
760 }
761
762 pub fn tool_count(&self) -> usize {
764 self.get_available_tool_names().len()
765 }
766}