1use actix_web::{web, HttpRequest, HttpResponse, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::env;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::RwLock;
9use chrono::{DateTime, Utc};
10use reqwest::Client;
11use uuid::Uuid;
12use crate::error::BrightDataError;
13use crate::types::{McpResponse};
14use crate::tool::{handle_mcp_initialize, get_current_mcp_session}; #[derive(Debug, Clone)]
17pub struct Config {
18 pub api_token: String,
19 pub web_unlocker_zone: String,
20 pub browser_zone: String,
21 pub serp_zone: String, pub rate_limit: Option<String>,
23 pub timeout: Duration,
24 pub max_retries: u32,
25}
26
27impl Config {
28 pub fn from_env() -> Result<Self, std::io::Error> {
29 Ok(Self {
30 api_token: env::var("API_TOKEN").unwrap_or_default(),
31 web_unlocker_zone: env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default_zone".to_string()),
32 browser_zone: env::var("BROWSER_ZONE").unwrap_or_else(|_| "default_browser".to_string()),
33 serp_zone: env::var("BRIGHTDATA_SERP_ZONE").unwrap_or_else(|_| "serp_api2".to_string()),
34 rate_limit: env::var("RATE_LIMIT").ok(),
35 timeout: Duration::from_secs(env::var("REQUEST_TIMEOUT").unwrap_or_else(|_| "300".to_string()).parse().unwrap_or(300)),
36 max_retries: env::var("MAX_RETRIES").unwrap_or_else(|_| "3".to_string()).parse().unwrap_or(3),
37 })
38 }
39}
40
41#[derive(Debug)]
42pub struct AppState {
43 pub config: Config,
44 pub session_id: Uuid,
45 pub http_client: Client,
46 pub rate_limits: Arc<RwLock<HashMap<String, (u32, DateTime<Utc>)>>>,
47 pub start_time: DateTime<Utc>,
48 pub current_mcp_session: Arc<RwLock<Option<String>>>, }
50
51impl AppState {
52 pub fn new(config: Config) -> Self {
53 Self {
54 session_id: Uuid::new_v4(),
55 config: config.clone(),
56 http_client: Client::builder().timeout(config.timeout).build().unwrap(),
57 rate_limits: Arc::new(RwLock::new(HashMap::new())),
58 start_time: Utc::now(),
59 current_mcp_session: Arc::new(RwLock::new(None)),
60 }
61 }
62}
63
64pub struct BrightDataUrls;
65
66impl BrightDataUrls {
67 pub fn request_api() -> String {
68 let base_url = std::env::var("BRIGHTDATA_BASE_URL")
69 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
70 format!("{}/request", base_url)
71 }
72}
73
74pub async fn handle_mcp_request(
76 _req: HttpRequest,
77 payload: web::Json<crate::types::McpRequest>,
78 state: web::Data<AppState>,
79) -> Result<HttpResponse> {
80 let req = payload.into_inner();
81 let id = req.id.clone();
82
83 let mcp_result: Result<McpResponse, String> = match req.method.as_str() {
84 "initialize" => {
86 log::info!("🎯 MCP Initialize received - starting new metrics session");
87
88 let session_id = handle_mcp_initialize();
90
91 {
93 let mut current_session = state.current_mcp_session.write().await;
94 *current_session = Some(session_id.clone());
95 }
96
97 log::info!("📊 New MCP session started: {}", session_id);
98
99 Ok(McpResponse {
100 jsonrpc: "2.0".to_string(),
101 id,
102 result: Some(serde_json::json!({
103 "protocolVersion": "2024-11-05",
104 "capabilities": {
105 "tools": {},
106 "logging": {},
107 "prompts": {}
108 },
109 "serverInfo": {
110 "name": "snm-brightdata-client",
111 "version": env!("CARGO_PKG_VERSION")
112 },
113 "instructions": "BrightData MCP Server ready with metrics tracking",
114 "session_id": session_id
115 })),
116 error: None,
117 })
118 }
119
120 "tools/list" => {
121 let current_session = get_current_mcp_session();
122 log::info!("📋 Tools list requested [Session: {:?}]", current_session);
123
124 Ok(McpResponse {
125 jsonrpc: "2.0".to_string(),
126 id,
127 result: Some(serde_json::json!({
128 "tools": [
129 { "name": "scrape_website", "description": "Scrape a web page" },
130 { "name": "search_web", "description": "Perform a web search" },
131 { "name": "extract_data", "description": "Extract structured data from a webpage" },
132 { "name": "take_screenshot", "description": "Take a screenshot of a webpage" },
133 { "name": "get_stock_data", "description": "Get stock market data" },
134 { "name": "get_crypto_data", "description": "Get cryptocurrency data" },
135 { "name": "get_etf_data", "description": "Get ETF data" },
136 { "name": "get_bond_data", "description": "Get bond market data" },
137 { "name": "get_mutual_fund_data", "description": "Get mutual fund data" },
138 { "name": "get_commodity_data", "description": "Get commodity market data" },
139 { "name": "get_market_overview", "description": "Get market overview" },
140 { "name": "multi_zone_search", "description": "Search across multiple zones" }
141 ],
142 "session_id": current_session
143 })),
144 error: None,
145 })
146 }
147
148 "tools/call" => {
149 let current_session = get_current_mcp_session();
150
151 if let Some(params) = req.params {
152 let name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
153 let args = params.get("arguments").cloned().unwrap_or_default();
154
155 log::info!("🔧 Tool call: {} [Session: {:?}]", name, current_session);
156
157 if !check_rate_limit(name, &state).await {
158 return Ok(HttpResponse::TooManyRequests().json(McpResponse {
159 jsonrpc: "2.0".to_string(),
160 id,
161 result: None,
162 error: Some(crate::types::McpError {
163 code: -32000,
164 message: "Rate limit exceeded".to_string(),
165 data: None,
166 }),
167 }));
168 }
169
170 let result = match name {
171 "scrape_website" => handle_scrape_website(&args, &state).await,
172 "search_web" => handle_search_web(&args, &state).await,
173 "extract_data" => handle_extract_placeholder(&args).await,
174 "take_screenshot" => handle_take_screenshot(&args, &state).await,
175 "get_stock_data" => handle_financial_tool("get_stock_data", &args).await,
176 "get_crypto_data" => handle_financial_tool("get_crypto_data", &args).await,
177 "get_etf_data" => handle_financial_tool("get_etf_data", &args).await,
178 "get_bond_data" => handle_financial_tool("get_bond_data", &args).await,
179 "get_mutual_fund_data" => handle_financial_tool("get_mutual_fund_data", &args).await,
180 "get_commodity_data" => handle_financial_tool("get_commodity_data", &args).await,
181 "get_market_overview" => handle_financial_tool("get_market_overview", &args).await,
182 "multi_zone_search" => handle_financial_tool("multi_zone_search", &args).await,
183 _ => Err("Unknown tool".to_string()),
184 };
185
186 Ok(match result {
187 Ok(content) => McpResponse {
188 jsonrpc: "2.0".to_string(),
189 id,
190 result: Some(serde_json::json!({
191 "content": content,
192 "session_id": current_session
193 })),
194 error: None,
195 },
196 Err(msg) => McpResponse {
197 jsonrpc: "2.0".to_string(),
198 id,
199 result: None,
200 error: Some(crate::types::McpError {
201 code: -32603,
202 message: msg,
203 data: Some(serde_json::json!({
204 "session_id": current_session
205 })),
206 }),
207 },
208 })
209 } else {
210 Ok(McpResponse {
211 jsonrpc: "2.0".to_string(),
212 id,
213 result: None,
214 error: Some(crate::types::McpError {
215 code: -32602,
216 message: "Missing parameters".into(),
217 data: None,
218 }),
219 })
220 }
221 }
222
223 _ => Ok(McpResponse {
224 jsonrpc: "2.0".to_string(),
225 id,
226 result: None,
227 error: Some(crate::types::McpError {
228 code: -32601,
229 message: "Method not found".to_string(),
230 data: None,
231 }),
232 }),
233 };
234
235 match mcp_result {
236 Ok(resp) => Ok(HttpResponse::Ok().json(resp)),
237 Err(e) => Ok(HttpResponse::InternalServerError().json(McpResponse {
238 jsonrpc: "2.0".to_string(),
239 id: req.id,
240 result: None,
241 error: Some(crate::types::McpError {
242 code: -32603,
243 message: e,
244 data: None,
245 }),
246 })),
247 }
248}
249
250pub async fn health_check(state: web::Data<AppState>) -> Result<HttpResponse> {
251 let current_session = get_current_mcp_session();
252
253 Ok(HttpResponse::Ok().json(serde_json::json!({
254 "status": "healthy",
255 "session_id": state.session_id,
256 "uptime_seconds": (Utc::now() - state.start_time).num_seconds(),
257 "zones": {
258 "web_unlocker": state.config.web_unlocker_zone,
259 "browser": state.config.browser_zone,
260 "serp": state.config.serp_zone
261 },
262 "mcp_session": current_session,
263 "metrics_tracking": current_session.is_some()
264 })))
265}
266
267pub async fn cors_handler() -> HttpResponse {
268 HttpResponse::Ok()
269 .insert_header(("Access-Control-Allow-Origin", "*"))
270 .insert_header(("Access-Control-Allow-Methods", "POST, GET, OPTIONS"))
271 .insert_header(("Access-Control-Allow-Headers", "Content-Type, Authorization"))
272 .finish()
273}
274
275async fn check_rate_limit(tool: &str, state: &web::Data<AppState>) -> bool {
276 let mut limits = state.rate_limits.write().await;
277 let now = Utc::now();
278 let entry = limits.entry(tool.to_string()).or_insert((0, now));
279
280 let limit = 10;
281 let window = chrono::Duration::seconds(60);
282
283 if now - entry.1 > window {
284 entry.0 = 0;
285 entry.1 = now;
286 }
287
288 if entry.0 >= limit {
289 false
290 } else {
291 entry.0 += 1;
292 true
293 }
294}
295
296pub async fn handle_scrape_website(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
297 let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
298 let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("markdown");
299
300 let mut payload = serde_json::json!({
301 "url": url,
302 "zone": state.config.web_unlocker_zone,
303 "format": "raw",
304 });
305
306 if format == "markdown" {
307 payload["data_format"] = serde_json::json!("markdown");
308 }
309
310 let api_url = BrightDataUrls::request_api();
311
312 let res = state.http_client
313 .post(&api_url)
314 .header("Authorization", format!("Bearer {}", state.config.api_token))
315 .json(&payload)
316 .send()
317 .await
318 .map_err(|e| e.to_string())?;
319
320 let body = res.text().await.map_err(|e| e.to_string())?;
321 Ok(body)
322}
323
324pub async fn handle_search_web(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
325 let query = args.get("query").and_then(|v| v.as_str()).ok_or("Missing 'query'")?;
326 let engine = args.get("engine").and_then(|v| v.as_str()).unwrap_or("google");
327 let cursor = args.get("cursor").and_then(|v| v.as_str()).unwrap_or("0");
328
329 let search_url = build_search_url(engine, query, cursor);
330
331 let payload = serde_json::json!({
333 "url": search_url,
334 "zone": state.config.serp_zone, "format": "raw",
336 "data_format": "markdown"
337 });
338
339 let api_url = BrightDataUrls::request_api();
340 let res = state.http_client
341 .post(&api_url)
342 .header("Authorization", format!("Bearer {}", state.config.api_token))
343 .json(&payload)
344 .send()
345 .await
346 .map_err(|e| e.to_string())?;
347
348 let body = res.text().await.map_err(|e| e.to_string())?;
349 Ok(body)
350}
351
352pub async fn handle_take_screenshot(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
353 let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
354 let width = args.get("width").and_then(|v| v.as_i64()).unwrap_or(1280);
355 let height = args.get("height").and_then(|v| v.as_i64()).unwrap_or(720);
356 let full_page = args.get("full_page").and_then(|v| v.as_bool()).unwrap_or(false);
357
358 let payload = serde_json::json!({
359 "url": url,
360 "zone": state.config.browser_zone,
361 "format": "raw",
362 "data_format": "screenshot",
363 "viewport": {
364 "width": width,
365 "height": height
366 },
367 "full_page": full_page
368 });
369
370 let api_url = BrightDataUrls::request_api();
371 let res = state.http_client
372 .post(&api_url)
373 .header("Authorization", format!("Bearer {}", state.config.api_token))
374 .json(&payload)
375 .send()
376 .await
377 .map_err(|e| e.to_string())?;
378
379 let _body = res.text().await.map_err(|e| e.to_string())?;
381 Ok(format!("Screenshot captured for {} ({}x{})", url, width, height))
382}
383
384async fn handle_financial_tool(tool_name: &str, args: &serde_json::Value) -> Result<String, String> {
386 use crate::tool::ToolResolver; let resolver = ToolResolver::default();
389 match resolver.resolve(tool_name) {
390 Some(tool) => {
391 match tool.execute(args.clone()).await {
392 Ok(result) => {
393 if !result.content.is_empty() {
394 Ok(result.content[0].text.clone())
395 } else {
396 Ok("No content returned".to_string())
397 }
398 },
399 Err(e) => Err(e.to_string()),
400 }
401 },
402 None => Err(format!("Tool '{}' not found", tool_name)),
403 }
404}
405
406pub async fn handle_extract_placeholder(_args: &serde_json::Value) -> Result<String, String> {
407 Ok("🧠Extract tool placeholder: AI-based structured data extraction coming soon.".to_string())
408}
409
410fn build_search_url(engine: &str, query: &str, cursor: &str) -> String {
411 let q = urlencoding::encode(query);
412 let page: usize = cursor.parse().unwrap_or(0);
413 let start = page * 10;
414
415 match engine {
416 "yandex" => format!("https://yandex.com/search/?text={q}&p={page}"),
417 "bing" => format!("https://www.bing.com/search?q={q}&first={}", start + 1),
418 "duckduckgo" => format!("https://duckduckgo.com/?q={q}&s={start}"),
419 _ => format!("https://www.google.com/search?q={q}&start={start}"),
420 }
421}