snm_brightdata_client/
server.rs

1// src/server.rs - Fixed version with unused variables removed
2use actix_web::{web, HttpRequest, HttpResponse, Result};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::env;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::RwLock;
9use chrono::{DateTime, Utc};
10use reqwest::Client;
11use uuid::Uuid;
12use crate::error::BrightDataError;
13use crate::types::{McpResponse};
14use crate::tool::{handle_mcp_initialize, get_current_mcp_session}; // Import MCP session functions
15
16#[derive(Debug, Clone)]
17pub struct Config {
18    pub api_token: String,
19    pub web_unlocker_zone: String,
20    pub browser_zone: String,
21    pub serp_zone: String,  // Added SERP zone
22    pub rate_limit: Option<String>,
23    pub timeout: Duration,
24    pub max_retries: u32,
25}
26
27impl Config {
28    pub fn from_env() -> Result<Self, std::io::Error> {
29        Ok(Self {
30            api_token: env::var("API_TOKEN").unwrap_or_default(),
31            web_unlocker_zone: env::var("WEB_UNLOCKER_ZONE").unwrap_or_else(|_| "default_zone".to_string()),
32            browser_zone: env::var("BROWSER_ZONE").unwrap_or_else(|_| "default_browser".to_string()),
33            serp_zone: env::var("BRIGHTDATA_SERP_ZONE").unwrap_or_else(|_| "serp_api2".to_string()),
34            rate_limit: env::var("RATE_LIMIT").ok(),
35            timeout: Duration::from_secs(env::var("REQUEST_TIMEOUT").unwrap_or_else(|_| "300".to_string()).parse().unwrap_or(300)),
36            max_retries: env::var("MAX_RETRIES").unwrap_or_else(|_| "3".to_string()).parse().unwrap_or(3),
37        })
38    }
39}
40
41#[derive(Debug)]
42pub struct AppState {
43    pub config: Config,
44    pub session_id: Uuid,
45    pub http_client: Client,
46    pub rate_limits: Arc<RwLock<HashMap<String, (u32, DateTime<Utc>)>>>,
47    pub start_time: DateTime<Utc>,
48    pub current_mcp_session: Arc<RwLock<Option<String>>>, // Track current MCP session
49}
50
51impl AppState {
52    pub fn new(config: Config) -> Self {
53        Self {
54            session_id: Uuid::new_v4(),
55            config: config.clone(),
56            http_client: Client::builder().timeout(config.timeout).build().unwrap(),
57            rate_limits: Arc::new(RwLock::new(HashMap::new())),
58            start_time: Utc::now(),
59            current_mcp_session: Arc::new(RwLock::new(None)),
60        }
61    }
62}
63
64pub struct BrightDataUrls;
65
66impl BrightDataUrls {
67    pub fn request_api() -> String {
68        let base_url = std::env::var("BRIGHTDATA_BASE_URL")
69            .unwrap_or_else(|_| "https://api.brightdata.com".to_string());
70        format!("{}/request", base_url)
71    }
72}
73
74// Enhanced MCP handler with initialize support for metrics
75pub async fn handle_mcp_request(
76    _req: HttpRequest,
77    payload: web::Json<crate::types::McpRequest>,
78    state: web::Data<AppState>,
79) -> Result<HttpResponse> {
80    let req = payload.into_inner();
81    let id = req.id.clone();
82
83    let mcp_result: Result<McpResponse, String> = match req.method.as_str() {
84        // Handle MCP initialize - this resets metrics for new session
85        "initialize" => {
86            log::info!("🎯 MCP Initialize received - starting new metrics session");
87            
88            // Start new MCP session and reset metrics
89            let session_id = handle_mcp_initialize();
90            
91            // Update app state with new session
92            {
93                let mut current_session = state.current_mcp_session.write().await;
94                *current_session = Some(session_id.clone());
95            }
96            
97            log::info!("📊 New MCP session started: {}", session_id);
98            
99            Ok(McpResponse {
100                jsonrpc: "2.0".to_string(),
101                id,
102                result: Some(serde_json::json!({
103                    "protocolVersion": "2024-11-05",
104                    "capabilities": {
105                        "tools": {},
106                        "logging": {},
107                        "prompts": {}
108                    },
109                    "serverInfo": {
110                        "name": "snm-brightdata-client",
111                        "version": env!("CARGO_PKG_VERSION")
112                    },
113                    "instructions": "BrightData MCP Server ready with metrics tracking",
114                    "session_id": session_id
115                })),
116                error: None,
117            })
118        }
119
120        "tools/list" => {
121            let current_session = get_current_mcp_session();
122            log::info!("📋 Tools list requested [Session: {:?}]", current_session);
123            
124            Ok(McpResponse {
125                jsonrpc: "2.0".to_string(),
126                id,
127                result: Some(serde_json::json!({
128                    "tools": [
129                        { "name": "scrape_website", "description": "Scrape a web page" },
130                        { "name": "search_web", "description": "Perform a web search" },
131                        { "name": "extract_data", "description": "Extract structured data from a webpage" },
132                        { "name": "take_screenshot", "description": "Take a screenshot of a webpage" },
133                        { "name": "get_stock_data", "description": "Get stock market data" },
134                        { "name": "get_crypto_data", "description": "Get cryptocurrency data" },
135                        { "name": "get_etf_data", "description": "Get ETF data" },
136                        { "name": "get_bond_data", "description": "Get bond market data" },
137                        { "name": "get_mutual_fund_data", "description": "Get mutual fund data" },
138                        { "name": "get_commodity_data", "description": "Get commodity market data" },
139                        { "name": "get_market_overview", "description": "Get market overview" },
140                        { "name": "multi_zone_search", "description": "Search across multiple zones" }
141                    ],
142                    "session_id": current_session
143                })),
144                error: None,
145            })
146        }
147
148        "tools/call" => {
149            let current_session = get_current_mcp_session();
150            
151            if let Some(params) = req.params {
152                let name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
153                let args = params.get("arguments").cloned().unwrap_or_default();
154
155                log::info!("🔧 Tool call: {} [Session: {:?}]", name, current_session);
156
157                if !check_rate_limit(name, &state).await {
158                    return Ok(HttpResponse::TooManyRequests().json(McpResponse {
159                        jsonrpc: "2.0".to_string(),
160                        id,
161                        result: None,
162                        error: Some(crate::types::McpError {
163                            code: -32000,
164                            message: "Rate limit exceeded".to_string(),
165                            data: None,
166                        }),
167                    }));
168                }
169
170                let result = match name {
171                    "scrape_website" => handle_scrape_website(&args, &state).await,
172                    "search_web" => handle_search_web(&args, &state).await,
173                    "extract_data" => handle_extract_placeholder(&args).await,
174                    "take_screenshot" => handle_take_screenshot(&args, &state).await,
175                    "get_stock_data" => handle_financial_tool("get_stock_data", &args).await,
176                    "get_crypto_data" => handle_financial_tool("get_crypto_data", &args).await,
177                    "get_etf_data" => handle_financial_tool("get_etf_data", &args).await,
178                    "get_bond_data" => handle_financial_tool("get_bond_data", &args).await,
179                    "get_mutual_fund_data" => handle_financial_tool("get_mutual_fund_data", &args).await,
180                    "get_commodity_data" => handle_financial_tool("get_commodity_data", &args).await,
181                    "get_market_overview" => handle_financial_tool("get_market_overview", &args).await,
182                    "multi_zone_search" => handle_financial_tool("multi_zone_search", &args).await,
183                    _ => Err("Unknown tool".to_string()),
184                };
185
186                Ok(match result {
187                    Ok(content) => McpResponse {
188                        jsonrpc: "2.0".to_string(),
189                        id,
190                        result: Some(serde_json::json!({ 
191                            "content": content,
192                            "session_id": current_session
193                        })),
194                        error: None,
195                    },
196                    Err(msg) => McpResponse {
197                        jsonrpc: "2.0".to_string(),
198                        id,
199                        result: None,
200                        error: Some(crate::types::McpError {
201                            code: -32603,
202                            message: msg,
203                            data: Some(serde_json::json!({
204                                "session_id": current_session
205                            })),
206                        }),
207                    },
208                })
209            } else {
210                Ok(McpResponse {
211                    jsonrpc: "2.0".to_string(),
212                    id,
213                    result: None,
214                    error: Some(crate::types::McpError {
215                        code: -32602,
216                        message: "Missing parameters".into(),
217                        data: None,
218                    }),
219                })
220            }
221        }
222
223        _ => Ok(McpResponse {
224            jsonrpc: "2.0".to_string(),
225            id,
226            result: None,
227            error: Some(crate::types::McpError {
228                code: -32601,
229                message: "Method not found".to_string(),
230                data: None,
231            }),
232        }),
233    };
234
235    match mcp_result {
236        Ok(resp) => Ok(HttpResponse::Ok().json(resp)),
237        Err(e) => Ok(HttpResponse::InternalServerError().json(McpResponse {
238            jsonrpc: "2.0".to_string(),
239            id: req.id,
240            result: None,
241            error: Some(crate::types::McpError {
242                code: -32603,
243                message: e,
244                data: None,
245            }),
246        })),
247    }
248}
249
250pub async fn health_check(state: web::Data<AppState>) -> Result<HttpResponse> {
251    let current_session = get_current_mcp_session();
252    
253    Ok(HttpResponse::Ok().json(serde_json::json!({
254        "status": "healthy",
255        "session_id": state.session_id,
256        "uptime_seconds": (Utc::now() - state.start_time).num_seconds(),
257        "zones": {
258            "web_unlocker": state.config.web_unlocker_zone,
259            "browser": state.config.browser_zone,
260            "serp": state.config.serp_zone
261        },
262        "mcp_session": current_session,
263        "metrics_tracking": current_session.is_some()
264    })))
265}
266
267pub async fn cors_handler() -> HttpResponse {
268    HttpResponse::Ok()
269        .insert_header(("Access-Control-Allow-Origin", "*"))
270        .insert_header(("Access-Control-Allow-Methods", "POST, GET, OPTIONS"))
271        .insert_header(("Access-Control-Allow-Headers", "Content-Type, Authorization"))
272        .finish()
273}
274
275async fn check_rate_limit(tool: &str, state: &web::Data<AppState>) -> bool {
276    let mut limits = state.rate_limits.write().await;
277    let now = Utc::now();
278    let entry = limits.entry(tool.to_string()).or_insert((0, now));
279
280    let limit = 10;
281    let window = chrono::Duration::seconds(60);
282
283    if now - entry.1 > window {
284        entry.0 = 0;
285        entry.1 = now;
286    }
287
288    if entry.0 >= limit {
289        false
290    } else {
291        entry.0 += 1;
292        true
293    }
294}
295
296pub async fn handle_scrape_website(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
297    let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
298    let format = args.get("format").and_then(|v| v.as_str()).unwrap_or("markdown");
299
300    let mut payload = serde_json::json!({
301        "url": url,
302        "zone": state.config.web_unlocker_zone,
303        "format": "raw",
304    });
305
306    if format == "markdown" {
307        payload["data_format"] = serde_json::json!("markdown");
308    }
309
310    let api_url = BrightDataUrls::request_api();
311
312    let res = state.http_client
313        .post(&api_url)
314        .header("Authorization", format!("Bearer {}", state.config.api_token))
315        .json(&payload)
316        .send()
317        .await
318        .map_err(|e| e.to_string())?;
319
320    let body = res.text().await.map_err(|e| e.to_string())?;
321    Ok(body)
322}
323
324pub async fn handle_search_web(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
325    let query = args.get("query").and_then(|v| v.as_str()).ok_or("Missing 'query'")?;
326    let engine = args.get("engine").and_then(|v| v.as_str()).unwrap_or("google");
327    let cursor = args.get("cursor").and_then(|v| v.as_str()).unwrap_or("0");
328
329    let search_url = build_search_url(engine, query, cursor);
330
331    // Use SERP zone for search operations
332    let payload = serde_json::json!({
333        "url": search_url,
334        "zone": state.config.serp_zone,  // Use SERP zone instead of web_unlocker_zone
335        "format": "raw",
336        "data_format": "markdown"
337    });
338
339    let api_url = BrightDataUrls::request_api();
340    let res = state.http_client
341        .post(&api_url)
342        .header("Authorization", format!("Bearer {}", state.config.api_token))
343        .json(&payload)
344        .send()
345        .await
346        .map_err(|e| e.to_string())?;
347
348    let body = res.text().await.map_err(|e| e.to_string())?;
349    Ok(body)
350}
351
352pub async fn handle_take_screenshot(args: &serde_json::Value, state: &web::Data<AppState>) -> Result<String, String> {
353    let url = args.get("url").and_then(|v| v.as_str()).ok_or("Missing 'url'")?;
354    let width = args.get("width").and_then(|v| v.as_i64()).unwrap_or(1280);
355    let height = args.get("height").and_then(|v| v.as_i64()).unwrap_or(720);
356    let full_page = args.get("full_page").and_then(|v| v.as_bool()).unwrap_or(false);
357
358    let payload = serde_json::json!({
359        "url": url,
360        "zone": state.config.browser_zone,
361        "format": "raw",
362        "data_format": "screenshot",
363        "viewport": {
364            "width": width,
365            "height": height
366        },
367        "full_page": full_page
368    });
369
370    let api_url = BrightDataUrls::request_api();
371    let res = state.http_client
372        .post(&api_url)
373        .header("Authorization", format!("Bearer {}", state.config.api_token))
374        .json(&payload)
375        .send()
376        .await
377        .map_err(|e| e.to_string())?;
378
379    // FIXED: Remove unused variable warning
380    let _body = res.text().await.map_err(|e| e.to_string())?;
381    Ok(format!("Screenshot captured for {} ({}x{})", url, width, height))
382}
383
384// Handler for financial tools using the tool resolver
385async fn handle_financial_tool(tool_name: &str, args: &serde_json::Value) -> Result<String, String> {
386    use crate::tool::ToolResolver; // FIXED: Remove unused Tool import
387    
388    let resolver = ToolResolver::default();
389    match resolver.resolve(tool_name) {
390        Some(tool) => {
391            match tool.execute(args.clone()).await {
392                Ok(result) => {
393                    if !result.content.is_empty() {
394                        Ok(result.content[0].text.clone())
395                    } else {
396                        Ok("No content returned".to_string())
397                    }
398                },
399                Err(e) => Err(e.to_string()),
400            }
401        },
402        None => Err(format!("Tool '{}' not found", tool_name)),
403    }
404}
405
406pub async fn handle_extract_placeholder(_args: &serde_json::Value) -> Result<String, String> {
407    Ok("🧠 Extract tool placeholder: AI-based structured data extraction coming soon.".to_string())
408}
409
410fn build_search_url(engine: &str, query: &str, cursor: &str) -> String {
411    let q = urlencoding::encode(query);
412    let page: usize = cursor.parse().unwrap_or(0);
413    let start = page * 10;
414
415    match engine {
416        "yandex" => format!("https://yandex.com/search/?text={q}&p={page}"),
417        "bing" => format!("https://www.bing.com/search?q={q}&first={}", start + 1),
418        "duckduckgo" => format!("https://duckduckgo.com/?q={q}&s={start}"),
419        _ => format!("https://www.google.com/search?q={q}&start={start}"),
420    }
421}