1use async_trait::async_trait;
3use crate::error::BrightDataError;
4use crate::extras::logger::{JSON_LOGGER, ExecutionLog};
5use crate::metrics::{BRIGHTDATA_METRICS, EnhancedLogger};
6use serde_json::Value;
7use serde::{Deserialize, Serialize};
8use log::{info, error};
9use std::time::Instant;
10use std::collections::HashMap;
11
12use std::sync::{Arc, Mutex};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16lazy_static::lazy_static! {
17 static ref MCP_SESSION_MANAGER: Arc<Mutex<McpSessionManager>> = Arc::new(Mutex::new(McpSessionManager::new()));
18}
19
20#[derive(Debug)]
21struct McpSessionManager {
22 current_session_id: Option<String>,
23 session_counter: AtomicU64,
24 session_start_time: Option<chrono::DateTime<chrono::Utc>>,
25}
26
27impl McpSessionManager {
28 fn new() -> Self {
29 Self {
30 current_session_id: None,
31 session_counter: AtomicU64::new(0),
32 session_start_time: None,
33 }
34 }
35
36 fn start_new_session(&mut self) -> String {
37 let session_count = self.session_counter.fetch_add(1, Ordering::SeqCst) + 1;
38 let session_id = format!("mcp_session_{}", session_count);
39
40 self.current_session_id = Some(session_id.clone());
41 self.session_start_time = Some(chrono::Utc::now());
42
43 info!("🎯 MCP Session {} started - resetting metrics", session_id);
44 session_id
45 }
46
47 fn get_current_session(&self) -> Option<String> {
48 self.current_session_id.clone()
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct McpContent {
55 #[serde(rename = "type")]
56 pub content_type: String,
57 pub text: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub data: Option<String>, #[serde(skip_serializing_if = "Option::is_none")]
61 pub media_type: Option<String>, }
63
64impl McpContent {
65 pub fn text(text: String) -> Self {
66 Self {
67 content_type: "text".to_string(),
68 text,
69 data: None,
70 media_type: None,
71 }
72 }
73
74 pub fn image(data: String, media_type: String) -> Self {
75 Self {
76 content_type: "image".to_string(),
77 text: "Screenshot captured".to_string(),
78 data: Some(data),
79 media_type: Some(media_type),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ToolResult {
87 pub content: Vec<McpContent>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub is_error: Option<bool>,
90 #[serde(skip_serializing_if = "Option::is_none")]
92 pub raw_value: Option<Value>,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub execution_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub session_id: Option<String>,
99}
100
101impl ToolResult {
102 pub fn success(content: Vec<McpContent>) -> Self {
103 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
104 Self {
105 content,
106 is_error: Some(false),
107 raw_value: None,
108 execution_id: None,
109 session_id,
110 }
111 }
112
113 pub fn success_with_text(text: String) -> Self {
114 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
115 Self {
116 content: vec![McpContent::text(text)],
117 is_error: Some(false),
118 raw_value: None,
119 execution_id: None,
120 session_id,
121 }
122 }
123
124 pub fn success_with_raw(content: Vec<McpContent>, raw: Value) -> Self {
125 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
126 Self {
127 content,
128 is_error: Some(false),
129 raw_value: Some(raw),
130 execution_id: None,
131 session_id,
132 }
133 }
134
135 pub fn success_with_execution_id(content: Vec<McpContent>, raw: Value, execution_id: String) -> Self {
136 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
137 Self {
138 content,
139 is_error: Some(false),
140 raw_value: Some(raw),
141 execution_id: Some(execution_id),
142 session_id,
143 }
144 }
145
146 pub fn error(message: String) -> Self {
147 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
148 Self {
149 content: vec![McpContent::text(format!("Error: {}", message))],
150 is_error: Some(true),
151 raw_value: None,
152 execution_id: None,
153 session_id,
154 }
155 }
156
157 pub fn from_legacy_value(value: Value) -> Self {
159 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
160 let text = if let Some(raw_text) = value.get("raw").and_then(|v| v.as_str()) {
161 raw_text.to_string()
162 } else {
163 serde_json::to_string_pretty(&value).unwrap_or_else(|_| value.to_string())
164 };
165
166 Self {
167 content: vec![McpContent::text(text)],
168 is_error: Some(false),
169 raw_value: Some(value),
170 execution_id: None,
171 session_id,
172 }
173 }
174}
175
176#[async_trait]
177pub trait Tool {
178 fn name(&self) -> &str;
179 fn description(&self) -> &str;
180 fn input_schema(&self) -> Value;
181
182 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
184 let start_time = Instant::now();
185
186 let current_session = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
188
189 let execution_log = JSON_LOGGER.start_execution(self.name(), parameters.clone()).await;
191 let execution_id = execution_log.execution_id.clone(); info!("🚀 Starting execution: {} (ID: {}) [Session: {:?}]",
194 self.name(), execution_id, current_session);
195
196 let result = self.execute_internal(parameters.clone()).await;
198 let duration = start_time.elapsed();
199
200 match &result {
202 Ok(tool_result) => {
203 let response_json = serde_json::to_value(tool_result).unwrap_or(serde_json::json!({}));
204
205 if let Err(e) = JSON_LOGGER.complete_execution(
207 execution_log, response_json.clone(),
209 true,
210 None,
211 ).await {
212 error!("Failed to log successful execution: {}", e);
213 }
214
215 if let Err(e) = log_tool_metrics(
217 &execution_id,
218 self.name(),
219 ¶meters,
220 tool_result,
221 duration.as_millis() as u64,
222 true,
223 None,
224 current_session.as_deref(),
225 ).await {
226 error!("Failed to log metrics: {}", e);
227 } else {
228 info!("📊 Metrics logged successfully for {} [Session: {:?}]", self.name(), current_session);
229 }
230
231 info!("✅ Execution completed successfully: {}", self.name());
232 }
233 Err(error) => {
234 let error_json = serde_json::json!({
235 "error": error.to_string(),
236 "tool": self.name()
237 });
238
239 if let Err(e) = JSON_LOGGER.complete_execution(
241 execution_log, error_json,
243 false,
244 Some(error.to_string()),
245 ).await {
246 error!("Failed to log failed execution: {}", e);
247 }
248
249 if let Err(e) = log_tool_error_metrics(
251 &format!("error_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f")),
252 self.name(),
253 ¶meters,
254 &error.to_string(),
255 duration.as_millis() as u64,
256 current_session.as_deref(),
257 ).await {
258 error!("Failed to log error metrics: {}", e);
259 }
260
261 error!("❌ Execution failed: {} - {}", self.name(), error);
262 }
263 }
264
265 result
266 }
267
268 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError>;
270
271 async fn execute_legacy(&self, parameters: Value) -> Result<Value, BrightDataError> {
273 let result = self.execute(parameters).await?;
274 if let Some(raw) = result.raw_value {
275 Ok(raw)
276 } else if !result.content.is_empty() {
277 Ok(serde_json::json!({
278 "content": result.content[0].text
279 }))
280 } else {
281 Ok(serde_json::json!({}))
282 }
283 }
284}
285
286pub fn handle_mcp_initialize() -> String {
288 let session_id = {
289 MCP_SESSION_MANAGER.lock().unwrap().start_new_session()
290 }; let session_id_clone = session_id.clone(); tokio::spawn(async move {
295 if let Err(e) = reset_metrics_for_new_session(&session_id_clone).await {
296 error!("Failed to reset metrics for new session: {}", e);
297 }
298 });
299
300 session_id }
302
303pub fn get_current_mcp_session() -> Option<String> {
304 MCP_SESSION_MANAGER.lock().unwrap().get_current_session()
305}
306
307async fn reset_metrics_for_new_session(session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
308 info!("🔄 Resetting metrics for new MCP session: {}", session_id);
309
310 BRIGHTDATA_METRICS.log_call(
312 &format!("session_start_{}", session_id),
313 &format!("mcp://session/{}", session_id),
314 "mcp_session",
315 "json",
316 Some("session_start"),
317 serde_json::json!({
318 "event": "mcp_initialize",
319 "session_id": session_id,
320 "timestamp": chrono::Utc::now().to_rfc3339()
321 }),
322 200,
323 HashMap::new(),
324 &format!("MCP session {} initialized", session_id),
325 None,
326 0,
327 None, Some(session_id), ).await?;
330
331 Ok(())
332}
333
334async fn log_tool_metrics(
336 execution_id: &str,
337 tool_name: &str,
338 parameters: &Value,
339 tool_result: &ToolResult,
340 duration_ms: u64,
341 success: bool,
342 error_message: Option<&str>,
343 session_id: Option<&str>,
344) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345
346 let (url, zone, format) = extract_brightdata_details(parameters, tool_result);
348
349 let content = if !tool_result.content.is_empty() {
351 &tool_result.content[0].text
352 } else {
353 "No content"
354 };
355
356 if let (Some(url), Some(zone), Some(format)) = (&url, &zone, &format) {
357 EnhancedLogger::log_brightdata_request_enhanced(
359 execution_id,
360 zone,
361 url,
362 parameters.clone(),
363 if success { 200 } else { 500 },
364 HashMap::new(),
365 format,
366 content,
367 None, std::time::Duration::from_millis(duration_ms),
369 session_id,
370 ).await?;
371
372 info!("📊 Logged BrightData tool {} to metrics [Session: {:?}]", tool_name, session_id);
373 } else {
374 BRIGHTDATA_METRICS.log_call(
377 execution_id,
378 &format!("tool://{}", tool_name),
379 "local_tool",
380 "json",
381 Some("tool_output"),
382 parameters.clone(),
383 if success { 200 } else { 500 },
384 HashMap::new(),
385 content,
386 None,
387 duration_ms,
388 None, session_id,
390 ).await?;
391
392 info!("📊 Logged generic tool {} to metrics [Session: {:?}]", tool_name, session_id);
393 }
394
395 Ok(())
396}
397
398async fn log_tool_error_metrics(
400 execution_id: &str,
401 tool_name: &str,
402 parameters: &Value,
403 error_message: &str,
404 duration_ms: u64,
405 session_id: Option<&str>,
406) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
407
408 BRIGHTDATA_METRICS.log_call(
410 execution_id,
411 &format!("tool://{}", tool_name),
412 "error",
413 "json",
414 Some("error"),
415 parameters.clone(),
416 500,
417 HashMap::new(),
418 &format!("Error: {}", error_message),
419 None,
420 duration_ms,
421 None, session_id,
423 ).await?;
424
425 info!("📊 Logged error metrics for {} [Session: {:?}]", tool_name, session_id);
426 Ok(())
427}
428
429fn extract_brightdata_details(parameters: &Value, tool_result: &ToolResult) -> (Option<String>, Option<String>, Option<String>) {
431 let mut url = None;
432 let mut zone = None;
433 let mut format = None;
434
435 if let Some(param_url) = parameters.get("url").and_then(|v| v.as_str()) {
437 url = Some(param_url.to_string());
438 }
439
440 if let Some(query) = parameters.get("query").and_then(|v| v.as_str()) {
442 if url.is_none() {
443 url = Some(format!("search:{}", query));
444 }
445 }
446
447 if let Some(raw_value) = &tool_result.raw_value {
449 if let Some(result_url) = raw_value.get("url").and_then(|v| v.as_str()) {
450 url = Some(result_url.to_string());
451 }
452 if let Some(result_zone) = raw_value.get("zone").and_then(|v| v.as_str()) {
453 zone = Some(result_zone.to_string());
454 }
455 if let Some(result_format) = raw_value.get("format").and_then(|v| v.as_str()) {
456 format = Some(result_format.to_string());
457 }
458 }
459
460 if zone.is_none() {
462 zone = Some(std::env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default".to_string()));
463 }
464
465 if format.is_none() {
466 format = Some("markdown".to_string());
467 }
468
469 (url, zone, format)
470}
471
472pub struct ToolResolver;
474
475impl Default for ToolResolver {
476 fn default() -> Self {
477 Self
478 }
479}
480
481impl ToolResolver {
482 pub fn resolve(&self, name: &str) -> Option<Box<dyn Tool + Send + Sync>> {
483 match name {
484 "extract_data" => Some(Box::new(crate::tools::extract::Extractor)),
487 "scrape_website" => Some(Box::new(crate::tools::scrape::Scraper)),
488 "get_stock_data" => Some(Box::new(crate::tools::stock::StockDataTool)),
492 "get_crypto_data" => Some(Box::new(crate::tools::crypto::CryptoDataTool)),
493 "get_etf_data" => Some(Box::new(crate::tools::etf::ETFDataTool)),
494 "get_bond_data" => Some(Box::new(crate::tools::bond::BondDataTool)),
495 "get_mutual_fund_data" => Some(Box::new(crate::tools::mutual_fund::MutualFundDataTool)),
496 "get_commodity_data" => Some(Box::new(crate::tools::commodity::CommodityDataTool)),
497
498 "multi_zone_search" => Some(Box::new(crate::tools::multi_zone_search::MultiZoneSearch)),
500
501 _ => None,
502 }
503 }
504
505 pub fn get_extract_data_tool(&self) -> Option<Box<dyn Tool + Send + Sync>> {
506 self.resolve("extract_data")
507 }
508
509 pub fn list_tools(&self) -> Vec<Value> {
510 vec![
511 serde_json::json!({
538 "name": "scrape_website",
539 "description": "Scrap structured data from a webpage using AI analysis",
540 "inputSchema": {
541 "type": "object",
542 "properties": {
543 "url": {
544 "type": "string",
545 "description": "The URL to Scrap data from"
546 },
547 "schema": {
548 "type": "object",
549 "description": "Optional schema to guide extraction",
550 "additionalProperties": true
551 },
552 "user_id": {
553 "type": "string",
554 "description": "Session ID for caching and conversation context tracking"
555 }
556 },
557 "required": ["url", "user_id"]
558 }
559 }),
560 serde_json::json!({
561 "name": "extract_data",
562 "description": "Extract structured data from a webpage using AI analysis",
563 "inputSchema": {
564 "type": "object",
565 "properties": {
566 "url": {
567 "type": "string",
568 "description": "The URL to extract data from"
569 },
570 "schema": {
571 "type": "object",
572 "description": "Optional schema to guide extraction",
573 "additionalProperties": true
574 },
575 "user_id": {
576 "type": "string",
577 "description": "Session ID for caching and conversation context tracking"
578 }
579 },
580 "required": ["url", "user_id"]
581 }
582 }),
583 serde_json::json!({
619 "name": "get_stock_data",
620 "description": "Get comprehensive stock data including prices, performance, market cap, volumes for specific stock symbols",
621 "inputSchema": {
622 "type": "object",
623 "properties": {
624 "symbol": {
625 "type": "string",
626 "description": "Stock symbol or ticker (e.g. ASHOKLEY, TCS, RELIANCE for Indian stocks; AAPL, MSFT, GOOGL for US stocks). Use exact trading symbols only."
627 },
628 "market": {
629 "type": "string",
630 "enum": ["indian", "us", "global"],
631 "default": "indian",
632 "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
633 },
634 "user_id": {
635 "type": "string",
636 "description": "Session ID for caching and conversation context tracking"
637 }
638 },
639 "required": ["symbol", "user_id"]
640 }
641 }),
642 serde_json::json!({
643 "name": "get_crypto_data",
644 "description": "Get cryptocurrency data including prices, market cap, trading volumes. Use for individual cryptos, crypto comparisons (BTC vs ETH), or overall crypto market analysis",
645 "inputSchema": {
646 "type": "object",
647 "properties": {
648 "query": {
649 "type": "string",
650 "description": "Crypto symbol (BTC, ETH, ADA), crypto name (Bitcoin, Ethereum), comparison query (BTC vs ETH), or market overview (crypto market today, top cryptocurrencies)"
651 }
652 },
653 "required": ["query"]
654 }
655 }),
656 serde_json::json!({
657 "name": "get_etf_data",
658 "description": "Get ETF and index fund data including NAV, holdings, performance, expense ratios",
659 "inputSchema": {
660 "type": "object",
661 "properties": {
662 "query": {
663 "type": "string",
664 "description": "ETF symbol (SPY, NIFTYBEES), ETF name, or ETF market analysis query"
665 },
666 "market": {
667 "type": "string",
668 "enum": ["indian", "us", "global"],
669 "default": "indian"
670 }
671 },
672 "required": ["query"]
673 }
674 }),
675 serde_json::json!({
676 "name": "get_bond_data",
677 "description": "Get bond market data including yields, government bonds, corporate bonds, and bond market trends",
678 "inputSchema": {
679 "type": "object",
680 "properties": {
681 "query": {
682 "type": "string",
683 "description": "Bond type (government bonds, corporate bonds), yield query (10-year yield), or bond market analysis"
684 },
685 "market": {
686 "type": "string",
687 "enum": ["indian", "us", "global"],
688 "default": "indian"
689 }
690 },
691 "required": ["query"]
692 }
693 }),
694 serde_json::json!({
695 "name": "get_mutual_fund_data",
696 "description": "Get mutual fund data including NAV, performance, portfolio composition, and fund comparisons",
697 "inputSchema": {
698 "type": "object",
699 "properties": {
700 "query": {
701 "type": "string",
702 "description": "Fund name, fund symbol, fund category (equity funds, debt funds), or fund comparison query"
703 },
704 "market": {
705 "type": "string",
706 "enum": ["indian", "us", "global"],
707 "default": "indian"
708 }
709 },
710 "required": ["query"]
711 }
712 }),
713 serde_json::json!({
714 "name": "get_commodity_data",
715 "description": "Get commodity prices and market data including gold, silver, oil, agricultural commodities",
716 "inputSchema": {
717 "type": "object",
718 "properties": {
719 "query": {
720 "type": "string",
721 "description": "Commodity name (gold, silver, crude oil), commodity symbol, or commodity market overview"
722 }
723 },
724 "required": ["query"]
725 }
726 })
727 ]
749 }
750
751 pub fn get_available_tool_names(&self) -> Vec<&'static str> {
753 vec![
754 "extract_data",
756 "scrape_website",
757 "get_stock_data",
759 "get_crypto_data",
760 "get_etf_data",
761 "get_bond_data",
762 "get_mutual_fund_data",
763 "get_commodity_data",
764 ]
766 }
767
768 pub fn tool_exists(&self, name: &str) -> bool {
770 self.get_available_tool_names().contains(&name)
771 }
772
773 pub fn tool_count(&self) -> usize {
775 self.get_available_tool_names().len()
776 }
777}