1use async_trait::async_trait;
3use crate::error::BrightDataError;
4use crate::extras::logger::{JSON_LOGGER, ExecutionLog};
5use crate::metrics::{BRIGHTDATA_METRICS, EnhancedLogger};
6use serde_json::Value;
7use serde::{Deserialize, Serialize};
8use log::{info, error};
9use std::time::Instant;
10use std::collections::HashMap;
11
12use std::sync::{Arc, Mutex};
14use std::sync::atomic::{AtomicU64, Ordering};
15
16lazy_static::lazy_static! {
17 static ref MCP_SESSION_MANAGER: Arc<Mutex<McpSessionManager>> = Arc::new(Mutex::new(McpSessionManager::new()));
18}
19
20#[derive(Debug)]
21struct McpSessionManager {
22 current_session_id: Option<String>,
23 session_counter: AtomicU64,
24 session_start_time: Option<chrono::DateTime<chrono::Utc>>,
25}
26
27impl McpSessionManager {
28 fn new() -> Self {
29 Self {
30 current_session_id: None,
31 session_counter: AtomicU64::new(0),
32 session_start_time: None,
33 }
34 }
35
36 fn start_new_session(&mut self) -> String {
37 let session_count = self.session_counter.fetch_add(1, Ordering::SeqCst) + 1;
38 let session_id = format!("mcp_session_{}", session_count);
39
40 self.current_session_id = Some(session_id.clone());
41 self.session_start_time = Some(chrono::Utc::now());
42
43 info!("🎯 MCP Session {} started - resetting metrics", session_id);
44 session_id
45 }
46
47 fn get_current_session(&self) -> Option<String> {
48 self.current_session_id.clone()
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct McpContent {
55 #[serde(rename = "type")]
56 pub content_type: String,
57 pub text: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub data: Option<String>, #[serde(skip_serializing_if = "Option::is_none")]
61 pub media_type: Option<String>, }
63
64impl McpContent {
65 pub fn text(text: String) -> Self {
66 Self {
67 content_type: "text".to_string(),
68 text,
69 data: None,
70 media_type: None,
71 }
72 }
73
74 pub fn image(data: String, media_type: String) -> Self {
75 Self {
76 content_type: "image".to_string(),
77 text: "Screenshot captured".to_string(),
78 data: Some(data),
79 media_type: Some(media_type),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ToolResult {
87 pub content: Vec<McpContent>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub is_error: Option<bool>,
90 #[serde(skip_serializing_if = "Option::is_none")]
92 pub raw_value: Option<Value>,
93 #[serde(skip_serializing_if = "Option::is_none")]
95 pub execution_id: Option<String>,
96 #[serde(skip_serializing_if = "Option::is_none")]
98 pub session_id: Option<String>,
99}
100
101impl ToolResult {
102 pub fn success(content: Vec<McpContent>) -> Self {
103 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
104 Self {
105 content,
106 is_error: Some(false),
107 raw_value: None,
108 execution_id: None,
109 session_id,
110 }
111 }
112
113 pub fn success_with_text(text: String) -> Self {
114 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
115 Self {
116 content: vec![McpContent::text(text)],
117 is_error: Some(false),
118 raw_value: None,
119 execution_id: None,
120 session_id,
121 }
122 }
123
124 pub fn success_with_raw(content: Vec<McpContent>, raw: Value) -> Self {
125 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
126 Self {
127 content,
128 is_error: Some(false),
129 raw_value: Some(raw),
130 execution_id: None,
131 session_id,
132 }
133 }
134
135 pub fn success_with_execution_id(content: Vec<McpContent>, raw: Value, execution_id: String) -> Self {
136 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
137 Self {
138 content,
139 is_error: Some(false),
140 raw_value: Some(raw),
141 execution_id: Some(execution_id),
142 session_id,
143 }
144 }
145
146 pub fn error(message: String) -> Self {
147 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
148 Self {
149 content: vec![McpContent::text(format!("Error: {}", message))],
150 is_error: Some(true),
151 raw_value: None,
152 execution_id: None,
153 session_id,
154 }
155 }
156
157 pub fn from_legacy_value(value: Value) -> Self {
159 let session_id = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
160 let text = if let Some(raw_text) = value.get("raw").and_then(|v| v.as_str()) {
161 raw_text.to_string()
162 } else {
163 serde_json::to_string_pretty(&value).unwrap_or_else(|_| value.to_string())
164 };
165
166 Self {
167 content: vec![McpContent::text(text)],
168 is_error: Some(false),
169 raw_value: Some(value),
170 execution_id: None,
171 session_id,
172 }
173 }
174}
175
176#[async_trait]
177pub trait Tool {
178 fn name(&self) -> &str;
179 fn description(&self) -> &str;
180 fn input_schema(&self) -> Value;
181
182 async fn execute(&self, parameters: Value) -> Result<ToolResult, BrightDataError> {
184 let start_time = Instant::now();
185
186 let current_session = MCP_SESSION_MANAGER.lock().unwrap().get_current_session();
188
189 let execution_log = JSON_LOGGER.start_execution(self.name(), parameters.clone()).await;
191 let execution_id = execution_log.execution_id.clone(); info!("🚀 Starting execution: {} (ID: {}) [Session: {:?}]",
194 self.name(), execution_id, current_session);
195
196 let result = self.execute_internal(parameters.clone()).await;
198 let duration = start_time.elapsed();
199
200 match &result {
202 Ok(tool_result) => {
203 let response_json = serde_json::to_value(tool_result).unwrap_or(serde_json::json!({}));
204
205 if let Err(e) = JSON_LOGGER.complete_execution(
207 execution_log, response_json.clone(),
209 true,
210 None,
211 ).await {
212 error!("Failed to log successful execution: {}", e);
213 }
214
215 if let Err(e) = log_tool_metrics(
217 &execution_id,
218 self.name(),
219 ¶meters,
220 tool_result,
221 duration.as_millis() as u64,
222 true,
223 None,
224 current_session.as_deref(),
225 ).await {
226 error!("Failed to log metrics: {}", e);
227 } else {
228 info!("📊 Metrics logged successfully for {} [Session: {:?}]", self.name(), current_session);
229 }
230
231 info!("✅ Execution completed successfully: {}", self.name());
232 }
233 Err(error) => {
234 let error_json = serde_json::json!({
235 "error": error.to_string(),
236 "tool": self.name()
237 });
238
239 if let Err(e) = JSON_LOGGER.complete_execution(
241 execution_log, error_json,
243 false,
244 Some(error.to_string()),
245 ).await {
246 error!("Failed to log failed execution: {}", e);
247 }
248
249 if let Err(e) = log_tool_error_metrics(
251 &format!("error_{}", chrono::Utc::now().format("%Y%m%d_%H%M%S%.3f")),
252 self.name(),
253 ¶meters,
254 &error.to_string(),
255 duration.as_millis() as u64,
256 current_session.as_deref(),
257 ).await {
258 error!("Failed to log error metrics: {}", e);
259 }
260
261 error!("❌ Execution failed: {} - {}", self.name(), error);
262 }
263 }
264
265 result
266 }
267
268 async fn execute_internal(&self, parameters: Value) -> Result<ToolResult, BrightDataError>;
270
271 async fn execute_legacy(&self, parameters: Value) -> Result<Value, BrightDataError> {
273 let result = self.execute(parameters).await?;
274 if let Some(raw) = result.raw_value {
275 Ok(raw)
276 } else if !result.content.is_empty() {
277 Ok(serde_json::json!({
278 "content": result.content[0].text
279 }))
280 } else {
281 Ok(serde_json::json!({}))
282 }
283 }
284}
285
286pub fn handle_mcp_initialize() -> String {
288 let session_id = {
289 MCP_SESSION_MANAGER.lock().unwrap().start_new_session()
290 }; let session_id_clone = session_id.clone(); tokio::spawn(async move {
295 if let Err(e) = reset_metrics_for_new_session(&session_id_clone).await {
296 error!("Failed to reset metrics for new session: {}", e);
297 }
298 });
299
300 session_id }
302
303pub fn get_current_mcp_session() -> Option<String> {
304 MCP_SESSION_MANAGER.lock().unwrap().get_current_session()
305}
306
307async fn reset_metrics_for_new_session(session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
308 info!("🔄 Resetting metrics for new MCP session: {}", session_id);
309
310 BRIGHTDATA_METRICS.log_call(
312 &format!("session_start_{}", session_id),
313 &format!("mcp://session/{}", session_id),
314 "mcp_session",
315 "json",
316 Some("session_start"),
317 serde_json::json!({
318 "event": "mcp_initialize",
319 "session_id": session_id,
320 "timestamp": chrono::Utc::now().to_rfc3339()
321 }),
322 200,
323 HashMap::new(),
324 &format!("MCP session {} initialized", session_id),
325 None,
326 0,
327 None, Some(session_id), ).await?;
330
331 Ok(())
332}
333
334async fn log_tool_metrics(
336 execution_id: &str,
337 tool_name: &str,
338 parameters: &Value,
339 tool_result: &ToolResult,
340 duration_ms: u64,
341 success: bool,
342 error_message: Option<&str>,
343 session_id: Option<&str>,
344) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345
346 let (url, zone, format) = extract_brightdata_details(parameters, tool_result);
348
349 let content = if !tool_result.content.is_empty() {
351 &tool_result.content[0].text
352 } else {
353 "No content"
354 };
355
356 if let (Some(url), Some(zone), Some(format)) = (&url, &zone, &format) {
357 EnhancedLogger::log_brightdata_request_enhanced(
359 execution_id,
360 zone,
361 url,
362 parameters.clone(),
363 if success { 200 } else { 500 },
364 HashMap::new(),
365 format,
366 content,
367 None, std::time::Duration::from_millis(duration_ms),
369 session_id,
370 ).await?;
371
372 info!("📊 Logged BrightData tool {} to metrics [Session: {:?}]", tool_name, session_id);
373 } else {
374 BRIGHTDATA_METRICS.log_call(
377 execution_id,
378 &format!("tool://{}", tool_name),
379 "local_tool",
380 "json",
381 Some("tool_output"),
382 parameters.clone(),
383 if success { 200 } else { 500 },
384 HashMap::new(),
385 content,
386 None,
387 duration_ms,
388 None, session_id,
390 ).await?;
391
392 info!("📊 Logged generic tool {} to metrics [Session: {:?}]", tool_name, session_id);
393 }
394
395 Ok(())
396}
397
398async fn log_tool_error_metrics(
400 execution_id: &str,
401 tool_name: &str,
402 parameters: &Value,
403 error_message: &str,
404 duration_ms: u64,
405 session_id: Option<&str>,
406) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
407
408 BRIGHTDATA_METRICS.log_call(
410 execution_id,
411 &format!("tool://{}", tool_name),
412 "error",
413 "json",
414 Some("error"),
415 parameters.clone(),
416 500,
417 HashMap::new(),
418 &format!("Error: {}", error_message),
419 None,
420 duration_ms,
421 None, session_id,
423 ).await?;
424
425 info!("📊 Logged error metrics for {} [Session: {:?}]", tool_name, session_id);
426 Ok(())
427}
428
429fn extract_brightdata_details(parameters: &Value, tool_result: &ToolResult) -> (Option<String>, Option<String>, Option<String>) {
431 let mut url = None;
432 let mut zone = None;
433 let mut format = None;
434
435 if let Some(param_url) = parameters.get("url").and_then(|v| v.as_str()) {
437 url = Some(param_url.to_string());
438 }
439
440 if let Some(query) = parameters.get("query").and_then(|v| v.as_str()) {
442 if url.is_none() {
443 url = Some(format!("search:{}", query));
444 }
445 }
446
447 if let Some(raw_value) = &tool_result.raw_value {
449 if let Some(result_url) = raw_value.get("url").and_then(|v| v.as_str()) {
450 url = Some(result_url.to_string());
451 }
452 if let Some(result_zone) = raw_value.get("zone").and_then(|v| v.as_str()) {
453 zone = Some(result_zone.to_string());
454 }
455 if let Some(result_format) = raw_value.get("format").and_then(|v| v.as_str()) {
456 format = Some(result_format.to_string());
457 }
458 }
459
460 if zone.is_none() {
462 zone = Some(std::env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default".to_string()));
463 }
464
465 if format.is_none() {
466 format = Some("markdown".to_string());
467 }
468
469 (url, zone, format)
470}
471
472pub struct ToolResolver;
474
475impl Default for ToolResolver {
476 fn default() -> Self {
477 Self
478 }
479}
480
481impl ToolResolver {
482 pub fn resolve(&self, name: &str) -> Option<Box<dyn Tool + Send + Sync>> {
483 match name {
484 "scrape_website" => Some(Box::new(crate::tools::scrape::Scraper)),
487 "get_forex_data" => Some(Box::new(crate::tools::forex::ForexDataTool)),
488 "get_stock_data" => Some(Box::new(crate::tools::stock::StockDataTool)),
489 "get_crypto_data" => Some(Box::new(crate::tools::crypto::CryptoDataTool)),
490 "get_etf_data" => Some(Box::new(crate::tools::etf::ETFDataTool)),
491 "get_bond_data" => Some(Box::new(crate::tools::bond::BondDataTool)),
492 "get_indices_data" => Some(Box::new(crate::tools::index::IndexDataTool)),
493 "get_commodity_data" => Some(Box::new(crate::tools::commodity::CommodityDataTool)),
494 "get_mutual_fund_data" => Some(Box::new(crate::tools::mutual_fund::MutualFundDataTool)),
495 _ => None,
496 }
497 }
498
499 pub fn get_extract_data_tool(&self) -> Option<Box<dyn Tool + Send + Sync>> {
500 self.resolve("extract_data")
501 }
502
503 pub fn list_tools(&self) -> Vec<Value> {
504 vec![
505 serde_json::json!({
506 "name": "scrape_website",
507 "description": "Scrap structured data from a webpage using AI analysis",
508 "inputSchema": {
509 "type": "object",
510 "properties": {
511 "url": {
512 "type": "string",
513 "description": "The URL to Scrap data from"
514 },
515 "schema": {
516 "type": "object",
517 "description": "Optional schema to guide extraction",
518 "additionalProperties": true
519 },
520 "user_id": {
521 "type": "string",
522 "description": "Session ID for caching and conversation context tracking"
523 }
524 },
525 "required": ["url", "user_id"]
526 }
527 }),
528
529 serde_json::json!({
531 "name": "get_stock_data",
532 "description": "Get comprehensive stock data including prices, performance, market cap, volumes for specific stock symbols",
533 "inputSchema": {
534 "type": "object",
535 "properties": {
536 "symbol": {
537 "type": "string",
538 "description": "Stock symbol or ticker (e.g. ASHOKLEY, TCS, RELIANCE for Indian stocks; AAPL, MSFT, GOOGL for US stocks). Use exact trading symbols only."
539 },
540 "market": {
541 "type": "string",
542 "enum": ["indian", "us", "global"],
543 "default": "indian",
544 "description": "Market region - indian for NSE/BSE stocks, us for NASDAQ/NYSE, global for international"
545 },
546 "user_id": {
547 "type": "string",
548 "description": "Session ID for caching and conversation context tracking"
549 }
550 },
551 "required": ["symbol", "user_id"]
552 }
553 }),
554
555 serde_json::json!({
557 "name": "get_crypto_data",
558 "description": "Get cryptocurrency data including prices, market cap, trading volumes. Use for individual cryptos, crypto comparisons (BTC vs ETH), or overall crypto market analysis. Source: Yahoo Finance https://finance.yahoo.com/quote/{}-USD/ (e.g., BTC-USD, ETH-USD, SQL-USD).",
559 "inputSchema": {
560 "type": "object",
561 "properties": {
562 "symbol": {
563 "symbol": "string",
564 "description": "Crypto symbol (BTC, ETH, ADA), crypto name (Bitcoin, Ethereum), comparison query (BTC vs ETH), or market overview (crypto market today, top cryptocurrencies)"
565 }
566 },
567 "user_id": {
568 "type": "string",
569 "description": "Session ID for caching and conversation context tracking"
570 },
571 "required": ["symbol", "user_id"]
572 }
573 }),
574
575 serde_json::json!({
577 "name": "get_etf_data",
578 "description": "Get comprehensive ETF snapshot (price, summary, metrics) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}.NS/ (e.g., NIFTYBEES).",
579 "inputSchema": {
580 "type": "object",
581 "properties": {
582 "symbol": {
583 "symbol": "string",
584 "description": "ETF ticker or name (e.g., NIFTYBEES, JUNIORBEES). If provided, used when 'symbol' missing."
585 }
586 },
587 "user_id": {
588 "type": "string",
589 "description": "Session ID for caching and conversation context tracking"
590 },
591 "required": ["symbol", "user_id"]
592 }
593 }),
594
595 serde_json::json!({
597 "name": "get_forex_data",
598 "description": "Get comprehensive Forex snapshot (spot rate, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{}=X/ (e.g., USDINR=X).",
599 "inputSchema": {
600 "type": "object",
601 "properties": {
602 "symbol": {
603 "symbol": "string",
604 "description": "Forex pair (e.g., USDINR, EURUSD, USD/JPY). Used if 'symbol' missing."
605 }
606 },
607 "user_id": {
608 "type": "string",
609 "description": "Session ID for caching and conversation context tracking"
610 },
611 "required": ["symbol", "user_id"]
612 }
613 }),
614
615 serde_json::json!({
617 "name": "get_commodity_data",
618 "description": "Get commodity (futures) snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Tradingview https://in.tradingview.com/symbols/MCX-{}!/ (e.g., MCX.NATURALGAS1, MCX.CRUDEOIL1).",
619 "inputSchema": {
620 "type": "object",
621 "properties": {
622 "symbol": {
623 "symbol": "string",
624 "description": "Commodity/futures symbol (e.g., CRUDEOIL, CRUDEOIL, NATURALGAS). Used if 'symbol' missing."
625 }
626 },
627 "user_id": {
628 "type": "string",
629 "description": "Session ID for caching and conversation context tracking"
630 },
631 "required": ["symbol", "user_id"]
632 }
633 }),
634
635 serde_json::json!({
637 "name": "get_bond_data",
638 "description": "Get bond/fund snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/^{SYMBOL}/ (e.g., ^TNX, ^IRX).",
639 "inputSchema": {
640 "type": "object",
641 "properties": {
642 "symbol": {
643 "symbol": "string",
644 "description": "Bond symbol (e.g., ^TNX, ^IRX, ^TYX, ^FVX). Used if 'symbol' missing.",
645 }
646 },
647 "user_id": {
648 "type": "string",
649 "description": "Session ID for caching and conversation context tracking"
650 },
651 "required": ["symbol", "user_id"]
652 }
653 }),
654
655 serde_json::json!({
657 "name": "get_indices_data",
658 "description": "Get stock index snapshot (price, change, ranges) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/^{INDEX_CODE}/ (e.g., ^NSEI).",
659 "inputSchema": {
660 "type": "object",
661 "properties": {
662 "symbol": {
663 "symbol": "string",
664 "description":"Index code (e.g., ^NSEI, ^NSEBANK). Used if 'symbol' missing.",
665 }
666 },
667 "user_id": {
668 "type": "string",
669 "description": "Session ID for caching and conversation context tracking"
670 },
671 "required": ["symbol", "user_id"]
672 }
673 }),
674
675 serde_json::json!({
677 "name": "get_mutual_fund_data",
678 "description": "Get mutual fund snapshot (price/NAV, summary) with cache, BrightData direct API and proxy fallback. Source: Yahoo Finance https://finance.yahoo.com/quote/{ISIN}.BO/ (e.g., INF846K01122.BO).",
679 "inputSchema": {
680 "type": "object",
681 "properties": {
682 "symbol": {
683 "symbol": "string",
684 "description": "Indian mutual fund ISIN or display code (e.g., INF846K01122.BO). Used if 'symbol' missing.",
685 }
686 },
687 "user_id": {
688 "type": "string",
689 "description": "Session ID for caching and conversation context tracking"
690 },
691 "required": ["symbol", "user_id"]
692 }
693 })
694 ]
695 }
696
697 pub fn get_available_tool_names(&self) -> Vec<&'static str> {
699 vec![
700 "scrape_website",
702 "get_forex_data",
703 "get_stock_data",
704 "get_crypto_data",
705 "get_etf_data",
706 "get_commodity_data",
707 "get_indices_data",
708 "get_bond_data",
709 "get_mutual_fund_data",
710 ]
711 }
712
713 pub fn tool_exists(&self, name: &str) -> bool {
715 self.get_available_tool_names().contains(&name)
716 }
717
718 pub fn tool_count(&self) -> usize {
720 self.get_available_tool_names().len()
721 }
722}