snm_brightdata_client/
server.rs

1// src/server.rs - Fixed version with unused variables removed
2use actix_web::{web, HttpRequest, HttpResponse, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::env;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::RwLock;
9use chrono::{DateTime, Utc};
10use reqwest::Client;
11use uuid::Uuid;
12use crate::error::BrightDataError;
13use crate::types::{McpResponse};
14use crate::tool::{handle_mcp_initialize, get_current_mcp_session}; // Import MCP session functions
15
16#[derive(Debug, Clone)]
17pub struct Config {
18    pub api_token: String,
19    pub web_unlocker_zone: String,
20    pub browser_zone: String,
21    pub serp_zone: String,  // Added SERP zone
22    pub rate_limit: Option<String>,
23    pub timeout: Duration,
24    pub max_retries: u32,
25}
26
27impl Config {
28    pub fn from_env() -> Result<Self, std::io::Error> {
29        Ok(Self {
30            api_token: env::var("API_TOKEN").unwrap_or_default(),
31            web_unlocker_zone: env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default_zone".to_string()),
32            browser_zone: env::var("BROWSER_ZONE").unwrap_or_else(|_| "default_browser".to_string()),
33            serp_zone: env::var("BRIGHTDATA_SERP_ZONE").unwrap_or_else(|_| "serp_api2".to_string()),
34            rate_limit: env::var("RATE_LIMIT").ok(),
35            timeout: Duration::from_secs(env::var("REQUEST_TIMEOUT").unwrap_or_else(|_| "300".to_string()).parse().unwrap_or(300)),
36            max_retries: env::var("MAX_RETRIES").unwrap_or_else(|_| "3".to_string()).parse().unwrap_or(3),
37        })
38    }
39}
40
41#[derive(Debug)]
42pub struct AppState {
43    pub config: Config,
44    pub session_id: Uuid,
45    pub http_client: Client,
46    pub rate_limits: Arc<RwLock<HashMap<String, (u32, DateTime<Utc>)>>>,
47    pub start_time: DateTime<Utc>,
48    pub current_mcp_session: Arc<RwLock<Option<String>>>, // Track current MCP session
49}
50
51impl AppState {
52    pub fn new(config: Config) -> Self {
53        Self {
54            session_id: Uuid::new_v4(),
55            config: config.clone(),
56            http_client: Client::builder().timeout(config.timeout).build().unwrap(),
57            rate_limits: Arc::new(RwLock::new(HashMap::new())),
58            start_time: Utc::now(),
59            current_mcp_session: Arc::new(RwLock::new(None)),
60        }
61    }
62}
63
64pub struct BrightDataUrls;
65
66impl BrightDataUrls {
67    pub fn request_api() -> String {
68        let base_url = std::env::var("BRIGHTDATA_BASE_URL")
69            .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
70        format!("{}/request", base_url)
71    }
72}
73
74// Enhanced MCP handler with initialize support for metrics
75pub async fn handle_mcp_request(
76    _req: HttpRequest,
77    payload: web::Json<crate::types::McpRequest>,
78    state: web::Data<AppState>,
79) -> Result<HttpResponse> {
80    let req = payload.into_inner();
81    let id = req.id.clone();
82
83    let mcp_result: Result<McpResponse, String> = match req.method.as_str() {
84        // Handle MCP initialize - this resets metrics for new session
85        "initialize" => {
86            log::info!("🎯 MCP Initialize received - starting new metrics session");
87            
88            // Start new MCP session and reset metrics
89            let session_id = handle_mcp_initialize();
90            
91            // Update app state with new session
92            {
93                let mut current_session = state.current_mcp_session.write().await;
94                *current_session = Some(session_id.clone());
95            }
96            
97            log::info!("📊 New MCP session started: {}", session_id);
98            
99            Ok(McpResponse {
100                jsonrpc: "2.0".to_string(),
101                id,
102                result: Some(serde_json::json!({
103                    "protocolVersion": "2024-11-05",
104                    "capabilities": {
105                        "tools": {},
106                        "logging": {},
107                        "prompts": {}
108                    },
109                    "serverInfo": {
110                        "name": "snm-brightdata-client",
111                        "version": env!("CARGO_PKG_VERSION")
112                    },
113                    "instructions": "BrightData MCP Server ready with metrics tracking",
114                    "session_id": session_id
115                })),
116                error: None,
117            })
118        }
119
120        "tools/list" => {
121            let current_session = get_current_mcp_session();
122            log::info!("📋 Tools list requested [Session: {:?}]", current_session);
123            
124            Ok(McpResponse {
125                jsonrpc: "2.0".to_string(),
126                id,
127                result: Some(serde_json::json!({
128                    "tools": [
129                        { "name": "scrape_website", "description": "Scrape a web page" },
130                        { "name": "search_web", "description": "Perform a web search" },
131                        { "name": "extract_data", "description": "Extract structured data from a webpage" },
132                        { "name": "take_screenshot", "description": "Take a screenshot of a webpage" },
133                        { "name": "get_stock_data", "description": "Get stock market data" },
134                        { "name": "get_crypto_data", "description": "Get cryptocurrency data" },
135                        { "name": "get_etf_data", "description": "Get ETF data" },
136                        { "name": "get_bond_data", "description": "Get bond market data" },
137                        { "name": "get_mutual_fund_data", "description": "Get mutual fund data" },
138                        { "name": "get_commodity_data", "description": "Get commodity market data" },
139                        { "name": "multi_zone_search", "description": "Search across multiple zones" }
140                    ],
141                    "session_id": current_session
142                })),
143                error: None,
144            })
145        }
146
147        "tools/call" => {
148            let current_session = get_current_mcp_session();
149            
150            if let Some(params) = req.params {
151                let name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
152                let args = params.get("arguments").cloned().unwrap_or_default();
153
154                log::info!("🔧 Tool call: {} [Session: {:?}]", name, current_session);
155
156                if !check_rate_limit(name, &state).await {
157                    return Ok(HttpResponse::TooManyRequests().json(McpResponse {
158                        jsonrpc: "2.0".to_string(),
159                        id,
160                        result: None,
161                        error: Some(crate::types::McpError {
162                            code: -32000,
163                            message: "Rate limit exceeded".to_string(),
164                            data: None,
165                        }),
166                    }));
167                }
168
169                let result = match name {
170                    "scrape_website" => handle_scrape_website(&args, &state).await,
171                    "search_web" => handle_search_web(&args, &state).await,
172                    "extract_data" => handle_extract_placeholder(&args).await,
173                    "take_screenshot" => handle_take_screenshot(&args, &state).await,
174                    "get_stock_data" => handle_financial_tool("get_stock_data", &args).await,
175                    "get_crypto_data" => handle_financial_tool("get_crypto_data", &args).await,
176                    "get_etf_data" => handle_financial_tool("get_etf_data", &args).await,
177                    "get_bond_data" => handle_financial_tool("get_bond_data", &args).await,
178                    "get_mutual_fund_data" => handle_financial_tool("get_mutual_fund_data", &args).await,
179                    "get_commodity_data" => handle_financial_tool("get_commodity_data", &args).await,
180                    "multi_zone_search" => handle_financial_tool("multi_zone_search", &args).await,
181                    _ => Err("Unknown tool".to_string()),
182                };
183
184                Ok(match result {
185                    Ok(content) => McpResponse {
186                        jsonrpc: "2.0".to_string(),
187                        id,
188                        result: Some(serde_json::json!({ 
189                            "content": content,
190                            "session_id": current_session
191                        })),
192                        error: None,
193                    },
194                    Err(msg) => McpResponse {
195                        jsonrpc: "2.0".to_string(),
196                        id,
197                        result: None,
198                        error: Some(crate::types::McpError {
199                            code: -32603,
200                            message: msg,
201                            data: Some(serde_json::json!({
202                                "session_id": current_session
203                            })),
204                        }),
205                    },
206                })
207            } else {
208                Ok(McpResponse {
209                    jsonrpc: "2.0".to_string(),
210                    id,
211                    result: None,
212                    error: Some(crate::types::McpError {
213                        code: -32602,
214                        message: "Missing parameters".into(),
215                        data: None,
216                    }),
217                })
218            }
219        }
220
221        _ => Ok(McpResponse {
222            jsonrpc: "2.0".to_string(),
223            id,
224            result: None,
225            error: Some(crate::types::McpError {
226                code: -32601,
227                message: "Method not found".to_string(),
228                data: None,
229            }),
230        }),
231    };
232
233    match mcp_result {
234        Ok(resp) => Ok(HttpResponse::Ok().json(resp)),
235        Err(e) => Ok(HttpResponse::InternalServerError().json(McpResponse {
236            jsonrpc: "2.0".to_string(),
237            id: req.id,
238            result: None,
239            error: Some(crate::types::McpError {
240                code: -32603,
241                message: e,
242                data: None,
243            }),
244        })),
245    }
246}
247
248pub async fn health_check(state: web::Data<AppState>) -> Result<HttpResponse> {
249    let current_session = get_current_mcp_session();
250    
251    Ok(HttpResponse::Ok().json(serde_json::json!({
252        "status": "healthy",
253        "session_id": state.session_id,
254        "uptime_seconds": (Utc::now() - state.start_time).num_seconds(),
255        "zones": {
256            "web_unlocker": state.config.web_unlocker_zone,
257            "browser": state.config.browser_zone,
258            "serp": state.config.serp_zone
259        },
260        "mcp_session": current_session,
261        "metrics_tracking": current_session.is_some()
262    })))
263}
264
265pub async fn cors_handler() -> HttpResponse {
266    HttpResponse::Ok()
267        .insert_header(("Access-Control-Allow-Origin", "*"))
268        .insert_header(("Access-Control-Allow-Methods", "POST, GET, OPTIONS"))
269        .insert_header(("Access-Control-Allow-Headers", "Content-Type, Authorization"))
270        .finish()
271}
272
273async fn check_rate_limit(tool: &str, state: &web::Data<AppState>) -> bool {
274    let mut limits = state.rate_limits.write().await;
275    let now = Utc::now();
276    let entry = limits.entry(tool.to_string()).or_insert((0, now));
277
278    let limit = 10;
279    let window = chrono::Duration::seconds(60);
280
281    if now - entry.1 > window {
282        entry.0 = 0;
283        entry.1 = now;
284    }
285
286    if entry.0 >= limit {
287        false
288    } else {
289        entry.0 += 1;
290        true
291    }
292}
293
294pub async fn handle_scrape_website(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
295    let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
296    let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("markdown");
297
298    let mut payload = serde_json::json!({
299        "url": url,
300        "zone": state.config.web_unlocker_zone,
301        "format": "raw",
302    });
303
304    if format == "markdown" {
305        payload["data_format"] = serde_json::json!("markdown");
306    }
307
308    let api_url = BrightDataUrls::request_api();
309
310    let res = state.http_client
311        .post(&api_url)
312        .header("Authorization", format!("Bearer {}", state.config.api_token))
313        .json(&payload)
314        .send()
315        .await
316        .map_err(|e| e.to_string())?;
317
318    let body = res.text().await.map_err(|e| e.to_string())?;
319    Ok(body)
320}
321
322pub async fn handle_search_web(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
323    let query = args.get("query").and_then(|v| v.as_str()).ok_or("Missing 'query'")?;
324    let engine = args.get("engine").and_then(|v| v.as_str()).unwrap_or("google");
325    let cursor = args.get("cursor").and_then(|v| v.as_str()).unwrap_or("0");
326
327    let search_url = build_search_url(engine, query, cursor);
328
329    // Use SERP zone for search operations
330    let payload = serde_json::json!({
331        "url": search_url,
332        "zone": state.config.serp_zone,  // Use SERP zone instead of web_unlocker_zone
333        "format": "raw",
334        "data_format": "markdown"
335    });
336
337    let api_url = BrightDataUrls::request_api();
338    let res = state.http_client
339        .post(&api_url)
340        .header("Authorization", format!("Bearer {}", state.config.api_token))
341        .json(&payload)
342        .send()
343        .await
344        .map_err(|e| e.to_string())?;
345
346    let body = res.text().await.map_err(|e| e.to_string())?;
347    Ok(body)
348}
349
350pub async fn handle_take_screenshot(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
351    let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
352    let width = args.get("width").and_then(|v| v.as_i64()).unwrap_or(1280);
353    let height = args.get("height").and_then(|v| v.as_i64()).unwrap_or(720);
354    let full_page = args.get("full_page").and_then(|v| v.as_bool()).unwrap_or(false);
355
356    let payload = serde_json::json!({
357        "url": url,
358        "zone": state.config.browser_zone,
359        "format": "raw",
360        "data_format": "screenshot",
361        "viewport": {
362            "width": width,
363            "height": height
364        },
365        "full_page": full_page
366    });
367
368    let api_url = BrightDataUrls::request_api();
369    let res = state.http_client
370        .post(&api_url)
371        .header("Authorization", format!("Bearer {}", state.config.api_token))
372        .json(&payload)
373        .send()
374        .await
375        .map_err(|e| e.to_string())?;
376
377    // FIXED: Remove unused variable warning
378    let _body = res.text().await.map_err(|e| e.to_string())?;
379    Ok(format!("Screenshot captured for {} ({}x{})", url, width, height))
380}
381
382// Handler for financial tools using the tool resolver
383async fn handle_financial_tool(tool_name: &str, args: &serde_json::Value) -> Result<String, String> {
384    use crate::tool::ToolResolver; // FIXED: Remove unused Tool import
385    
386    let resolver = ToolResolver::default();
387    match resolver.resolve(tool_name) {
388        Some(tool) => {
389            match tool.execute(args.clone()).await {
390                Ok(result) => {
391                    if !result.content.is_empty() {
392                        Ok(result.content[0].text.clone())
393                    } else {
394                        Ok("No content returned".to_string())
395                    }
396                },
397                Err(e) => Err(e.to_string()),
398            }
399        },
400        None => Err(format!("Tool '{}' not found", tool_name)),
401    }
402}
403
404pub async fn handle_extract_placeholder(_args: &serde_json::Value) -> Result<String, String> {
405    Ok("🧠 Extract tool placeholder: AI-based structured data extraction coming soon.".to_string())
406}
407
408fn build_search_url(engine: &str, query: &str, cursor: &str) -> String {
409    let q = urlencoding::encode(query);
410    let page: usize = cursor.parse().unwrap_or(0);
411    let start = page * 10;
412
413    match engine {
414        "yandex" => format!("https://yandex.com/search/?text={q}&p={page}"),
415        "bing" => format!("https://www.bing.com/search?q={q}&first={}", start + 1),
416        "duckduckgo" => format!("https://duckduckgo.com/?q={q}&s={start}"),
417        _ => format!("https://www.google.com/search?q={q}&start={start}"),
418    }
419}