1use actix_web::{web, HttpRequest, HttpResponse, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::env;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::RwLock;
9use chrono::{DateTime, Utc};
10use reqwest::Client;
11use uuid::Uuid;
12use crate::error::BrightDataError;
13use crate::types::{McpResponse};
14use crate::tool::{handle_mcp_initialize, get_current_mcp_session}; #[derive(Debug, Clone)]
17pub struct Config {
18 pub api_token: String,
19 pub web_unlocker_zone: String,
20 pub browser_zone: String,
21 pub serp_zone: String, pub rate_limit: Option<String>,
23 pub timeout: Duration,
24 pub max_retries: u32,
25}
26
27impl Config {
28 pub fn from_env() -> Result<Self, std::io::Error> {
29 Ok(Self {
30 api_token: env::var("API_TOKEN").unwrap_or_default(),
31 web_unlocker_zone: env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default_zone".to_string()),
32 browser_zone: env::var("BROWSER_ZONE").unwrap_or_else(|_| "default_browser".to_string()),
33 serp_zone: env::var("BRIGHTDATA_SERP_ZONE").unwrap_or_else(|_| "serp_api2".to_string()),
34 rate_limit: env::var("RATE_LIMIT").ok(),
35 timeout: Duration::from_secs(env::var("REQUEST_TIMEOUT").unwrap_or_else(|_| "300".to_string()).parse().unwrap_or(300)),
36 max_retries: env::var("MAX_RETRIES").unwrap_or_else(|_| "3".to_string()).parse().unwrap_or(3),
37 })
38 }
39}
40
41#[derive(Debug)]
42pub struct AppState {
43 pub config: Config,
44 pub session_id: Uuid,
45 pub http_client: Client,
46 pub rate_limits: Arc<RwLock<HashMap<String, (u32, DateTime<Utc>)>>>,
47 pub start_time: DateTime<Utc>,
48 pub current_mcp_session: Arc<RwLock<Option<String>>>, }
50
51impl AppState {
52 pub fn new(config: Config) -> Self {
53 Self {
54 session_id: Uuid::new_v4(),
55 config: config.clone(),
56 http_client: Client::builder().timeout(config.timeout).build().unwrap(),
57 rate_limits: Arc::new(RwLock::new(HashMap::new())),
58 start_time: Utc::now(),
59 current_mcp_session: Arc::new(RwLock::new(None)),
60 }
61 }
62}
63
64pub struct BrightDataUrls;
65
66impl BrightDataUrls {
67 pub fn request_api() -> String {
68 let base_url = std::env::var("BRIGHTDATA_BASE_URL")
69 .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
70 format!("{}/request", base_url)
71 }
72}
73
74pub async fn handle_mcp_request(
76 _req: HttpRequest,
77 payload: web::Json<crate::types::McpRequest>,
78 state: web::Data<AppState>,
79) -> Result<HttpResponse> {
80 let req = payload.into_inner();
81 let id = req.id.clone();
82
83 let mcp_result: Result<McpResponse, String> = match req.method.as_str() {
84 "initialize" => {
86 log::info!("🎯 MCP Initialize received - starting new metrics session");
87
88 let session_id = handle_mcp_initialize();
90
91 {
93 let mut current_session = state.current_mcp_session.write().await;
94 *current_session = Some(session_id.clone());
95 }
96
97 log::info!("📊 New MCP session started: {}", session_id);
98
99 Ok(McpResponse {
100 jsonrpc: "2.0".to_string(),
101 id,
102 result: Some(serde_json::json!({
103 "protocolVersion": "2024-11-05",
104 "capabilities": {
105 "tools": {},
106 "logging": {},
107 "prompts": {}
108 },
109 "serverInfo": {
110 "name": "snm-brightdata-client",
111 "version": env!("CARGO_PKG_VERSION")
112 },
113 "instructions": "BrightData MCP Server ready with metrics tracking",
114 "session_id": session_id
115 })),
116 error: None,
117 })
118 }
119
120 "tools/list" => {
121 let current_session = get_current_mcp_session();
122 log::info!("📋 Tools list requested [Session: {:?}]", current_session);
123
124 Ok(McpResponse {
125 jsonrpc: "2.0".to_string(),
126 id,
127 result: Some(serde_json::json!({
128 "tools": [
129 { "name": "scrape_website", "description": "Scrape a web page" },
130 { "name": "search_web", "description": "Perform a web search" },
131 { "name": "extract_data", "description": "Extract structured data from a webpage" },
132 { "name": "take_screenshot", "description": "Take a screenshot of a webpage" },
133 { "name": "get_stock_data", "description": "Get stock market data" },
134 { "name": "get_crypto_data", "description": "Get cryptocurrency data" },
135 { "name": "get_etf_data", "description": "Get ETF data" },
136 { "name": "get_bond_data", "description": "Get bond market data" },
137 { "name": "get_mutual_fund_data", "description": "Get mutual fund data" },
138 { "name": "get_commodity_data", "description": "Get commodity market data" },
139 { "name": "multi_zone_search", "description": "Search across multiple zones" }
140 ],
141 "session_id": current_session
142 })),
143 error: None,
144 })
145 }
146
147 "tools/call" => {
148 let current_session = get_current_mcp_session();
149
150 if let Some(params) = req.params {
151 let name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
152 let args = params.get("arguments").cloned().unwrap_or_default();
153
154 log::info!("🔧 Tool call: {} [Session: {:?}]", name, current_session);
155
156 if !check_rate_limit(name, &state).await {
157 return Ok(HttpResponse::TooManyRequests().json(McpResponse {
158 jsonrpc: "2.0".to_string(),
159 id,
160 result: None,
161 error: Some(crate::types::McpError {
162 code: -32000,
163 message: "Rate limit exceeded".to_string(),
164 data: None,
165 }),
166 }));
167 }
168
169 let result = match name {
170 "scrape_website" => handle_scrape_website(&args, &state).await,
171 "search_web" => handle_search_web(&args, &state).await,
172 "extract_data" => handle_extract_placeholder(&args).await,
173 "take_screenshot" => handle_take_screenshot(&args, &state).await,
174 "get_stock_data" => handle_financial_tool("get_stock_data", &args).await,
175 "get_crypto_data" => handle_financial_tool("get_crypto_data", &args).await,
176 "get_etf_data" => handle_financial_tool("get_etf_data", &args).await,
177 "get_bond_data" => handle_financial_tool("get_bond_data", &args).await,
178 "get_mutual_fund_data" => handle_financial_tool("get_mutual_fund_data", &args).await,
179 "get_commodity_data" => handle_financial_tool("get_commodity_data", &args).await,
180 "multi_zone_search" => handle_financial_tool("multi_zone_search", &args).await,
181 _ => Err("Unknown tool".to_string()),
182 };
183
184 Ok(match result {
185 Ok(content) => McpResponse {
186 jsonrpc: "2.0".to_string(),
187 id,
188 result: Some(serde_json::json!({
189 "content": content,
190 "session_id": current_session
191 })),
192 error: None,
193 },
194 Err(msg) => McpResponse {
195 jsonrpc: "2.0".to_string(),
196 id,
197 result: None,
198 error: Some(crate::types::McpError {
199 code: -32603,
200 message: msg,
201 data: Some(serde_json::json!({
202 "session_id": current_session
203 })),
204 }),
205 },
206 })
207 } else {
208 Ok(McpResponse {
209 jsonrpc: "2.0".to_string(),
210 id,
211 result: None,
212 error: Some(crate::types::McpError {
213 code: -32602,
214 message: "Missing parameters".into(),
215 data: None,
216 }),
217 })
218 }
219 }
220
221 _ => Ok(McpResponse {
222 jsonrpc: "2.0".to_string(),
223 id,
224 result: None,
225 error: Some(crate::types::McpError {
226 code: -32601,
227 message: "Method not found".to_string(),
228 data: None,
229 }),
230 }),
231 };
232
233 match mcp_result {
234 Ok(resp) => Ok(HttpResponse::Ok().json(resp)),
235 Err(e) => Ok(HttpResponse::InternalServerError().json(McpResponse {
236 jsonrpc: "2.0".to_string(),
237 id: req.id,
238 result: None,
239 error: Some(crate::types::McpError {
240 code: -32603,
241 message: e,
242 data: None,
243 }),
244 })),
245 }
246}
247
248pub async fn health_check(state: web::Data<AppState>) -> Result<HttpResponse> {
249 let current_session = get_current_mcp_session();
250
251 Ok(HttpResponse::Ok().json(serde_json::json!({
252 "status": "healthy",
253 "session_id": state.session_id,
254 "uptime_seconds": (Utc::now() - state.start_time).num_seconds(),
255 "zones": {
256 "web_unlocker": state.config.web_unlocker_zone,
257 "browser": state.config.browser_zone,
258 "serp": state.config.serp_zone
259 },
260 "mcp_session": current_session,
261 "metrics_tracking": current_session.is_some()
262 })))
263}
264
265pub async fn cors_handler() -> HttpResponse {
266 HttpResponse::Ok()
267 .insert_header(("Access-Control-Allow-Origin", "*"))
268 .insert_header(("Access-Control-Allow-Methods", "POST, GET, OPTIONS"))
269 .insert_header(("Access-Control-Allow-Headers", "Content-Type, Authorization"))
270 .finish()
271}
272
273async fn check_rate_limit(tool: &str, state: &web::Data<AppState>) -> bool {
274 let mut limits = state.rate_limits.write().await;
275 let now = Utc::now();
276 let entry = limits.entry(tool.to_string()).or_insert((0, now));
277
278 let limit = 10;
279 let window = chrono::Duration::seconds(60);
280
281 if now - entry.1 > window {
282 entry.0 = 0;
283 entry.1 = now;
284 }
285
286 if entry.0 >= limit {
287 false
288 } else {
289 entry.0 += 1;
290 true
291 }
292}
293
294pub async fn handle_scrape_website(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
295 let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
296 let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("markdown");
297
298 let mut payload = serde_json::json!({
299 "url": url,
300 "zone": state.config.web_unlocker_zone,
301 "format": "raw",
302 });
303
304 if format == "markdown" {
305 payload["data_format"] = serde_json::json!("markdown");
306 }
307
308 let api_url = BrightDataUrls::request_api();
309
310 let res = state.http_client
311 .post(&api_url)
312 .header("Authorization", format!("Bearer {}", state.config.api_token))
313 .json(&payload)
314 .send()
315 .await
316 .map_err(|e| e.to_string())?;
317
318 let body = res.text().await.map_err(|e| e.to_string())?;
319 Ok(body)
320}
321
322pub async fn handle_search_web(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
323 let query = args.get("query").and_then(|v| v.as_str()).ok_or("Missing 'query'")?;
324 let engine = args.get("engine").and_then(|v| v.as_str()).unwrap_or("google");
325 let cursor = args.get("cursor").and_then(|v| v.as_str()).unwrap_or("0");
326
327 let search_url = build_search_url(engine, query, cursor);
328
329 let payload = serde_json::json!({
331 "url": search_url,
332 "zone": state.config.serp_zone, "format": "raw",
334 "data_format": "markdown"
335 });
336
337 let api_url = BrightDataUrls::request_api();
338 let res = state.http_client
339 .post(&api_url)
340 .header("Authorization", format!("Bearer {}", state.config.api_token))
341 .json(&payload)
342 .send()
343 .await
344 .map_err(|e| e.to_string())?;
345
346 let body = res.text().await.map_err(|e| e.to_string())?;
347 Ok(body)
348}
349
350pub async fn handle_take_screenshot(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
351 let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
352 let width = args.get("width").and_then(|v| v.as_i64()).unwrap_or(1280);
353 let height = args.get("height").and_then(|v| v.as_i64()).unwrap_or(720);
354 let full_page = args.get("full_page").and_then(|v| v.as_bool()).unwrap_or(false);
355
356 let payload = serde_json::json!({
357 "url": url,
358 "zone": state.config.browser_zone,
359 "format": "raw",
360 "data_format": "screenshot",
361 "viewport": {
362 "width": width,
363 "height": height
364 },
365 "full_page": full_page
366 });
367
368 let api_url = BrightDataUrls::request_api();
369 let res = state.http_client
370 .post(&api_url)
371 .header("Authorization", format!("Bearer {}", state.config.api_token))
372 .json(&payload)
373 .send()
374 .await
375 .map_err(|e| e.to_string())?;
376
377 let _body = res.text().await.map_err(|e| e.to_string())?;
379 Ok(format!("Screenshot captured for {} ({}x{})", url, width, height))
380}
381
382async fn handle_financial_tool(tool_name: &str, args: &serde_json::Value) -> Result<String, String> {
384 use crate::tool::ToolResolver; let resolver = ToolResolver::default();
387 match resolver.resolve(tool_name) {
388 Some(tool) => {
389 match tool.execute(args.clone()).await {
390 Ok(result) => {
391 if !result.content.is_empty() {
392 Ok(result.content[0].text.clone())
393 } else {
394 Ok("No content returned".to_string())
395 }
396 },
397 Err(e) => Err(e.to_string()),
398 }
399 },
400 None => Err(format!("Tool '{}' not found", tool_name)),
401 }
402}
403
404pub async fn handle_extract_placeholder(_args: &serde_json::Value) -> Result<String, String> {
405 Ok("🧠Extract tool placeholder: AI-based structured data extraction coming soon.".to_string())
406}
407
408fn build_search_url(engine: &str, query: &str, cursor: &str) -> String {
409 let q = urlencoding::encode(query);
410 let page: usize = cursor.parse().unwrap_or(0);
411 let start = page * 10;
412
413 match engine {
414 "yandex" => format!("https://yandex.com/search/?text={q}&p={page}"),
415 "bing" => format!("https://www.bing.com/search?q={q}&first={}", start + 1),
416 "duckduckgo" => format!("https://duckduckgo.com/?q={q}&s={start}"),
417 _ => format!("https://www.google.com/search?q={q}&start={start}"),
418 }
419}