1use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::fs::OpenOptions;
8use tokio::io::AsyncWriteExt;
9use chrono::{DateTime, Utc};
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
13pub enum BrightDataService {
14 Crawl,
15 Browse,
16 SERP,
17 WebUnlocker,
18 McpSession,
19}
20
21impl From<&str> for BrightDataService {
22 fn from(zone: &str) -> Self {
23 match zone.to_lowercase().as_str() {
24 z if z.contains("serp") => BrightDataService::SERP,
25 z if z.contains("crawl") => BrightDataService::Crawl,
26 z if z.contains("browser") => BrightDataService::Browse,
27 z if z.contains("mcp_session") => BrightDataService::McpSession,
28 _ => BrightDataService::WebUnlocker,
29 }
30 }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub enum DataFormat {
36 Raw,
37 Markdown,
38 JSON,
39 HTML,
40 Screenshot,
41 SessionEvent,
42 Unknown(String),
43}
44
45impl From<&str> for DataFormat {
46 fn from(format: &str) -> Self {
47 match format.to_lowercase().as_str() {
48 "raw" => DataFormat::Raw,
49 "markdown" => DataFormat::Markdown,
50 "json" => DataFormat::JSON,
51 "html" => DataFormat::HTML,
52 "screenshot" => DataFormat::Screenshot,
53 "session_event" => DataFormat::SessionEvent,
54 "session_start" => DataFormat::SessionEvent,
55 _ => DataFormat::Unknown(format.to_string()),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct BrightDataConfig {
63 pub zone: String,
64 pub format: String,
65 pub data_format: Option<String>,
66 pub timeout_seconds: u64,
67 pub viewport: Option<Value>,
68 pub custom_headers: Option<HashMap<String, String>>,
69 pub full_page: Option<bool>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BrightDataCall {
75 pub call_id: String,
76 pub timestamp: DateTime<Utc>,
77 pub service: BrightDataService,
78 pub sequence_number: u64,
79 pub anthropic_request_id: Option<String>,
80 pub mcp_session_id: Option<String>,
81
82 pub url: String,
84 pub query: Option<String>,
85 pub config: BrightDataConfig,
86
87 pub status_code: u16,
89 pub response_headers: HashMap<String, String>,
90 pub data_format: DataFormat,
91 pub raw_data_size_kb: f64,
92 pub filtered_data_size_kb: f64,
93 pub truncated: bool,
94 pub truncation_reason: Option<String>,
95
96 pub raw_response_data: Option<String>,
98 pub filtered_response_data: Option<String>,
99
100 pub duration_ms: u64,
102 pub success: bool,
103 pub error_message: Option<String>,
104
105 pub content_quality_score: u8,
107 pub contains_financial_data: bool,
108 pub is_navigation_heavy: bool,
109 pub is_error_page: bool,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ServiceMetrics {
115 pub service: BrightDataService,
116 pub total_calls: u64,
117 pub successful_calls: u64,
118 pub failed_calls: u64,
119 pub total_data_kb: f64,
120 pub average_data_size_kb: f64,
121 pub total_duration_ms: u64,
122 pub average_duration_ms: f64,
123 pub most_used_format: DataFormat,
124 pub most_used_zone: String,
125 pub truncation_rate: f64,
126 pub unique_sessions: u64,
127}
128
129pub struct BrightDataMetricsLogger {
131 call_counter: AtomicU64,
132 calls: Arc<Mutex<Vec<BrightDataCall>>>,
133 service_counters: Arc<Mutex<HashMap<BrightDataService, AtomicU64>>>,
134 session_counters: Arc<Mutex<HashMap<String, AtomicU64>>>,
135 log_file_path: String,
136}
137
138impl BrightDataMetricsLogger {
139 pub fn new(log_file_path: &str) -> Self {
140 let path = std::path::Path::new(log_file_path);
142
143 if let Some(parent) = path.parent() {
145 if let Err(e) = std::fs::create_dir_all(parent) {
146 eprintln!("Warning: Failed to create log directory {:?}: {}", parent, e);
147 }
148 }
149
150 let final_path = if path.is_dir() || log_file_path.ends_with('/') || log_file_path.ends_with('\\') {
152 let dir_path = if log_file_path.ends_with('/') || log_file_path.ends_with('\\') {
154 log_file_path.trim_end_matches('/').trim_end_matches('\\')
155 } else {
156 log_file_path
157 };
158 format!("{}/brightdata_metrics.jsonl", dir_path)
159 } else if !log_file_path.contains('.') {
160 format!("{}.jsonl", log_file_path)
162 } else {
163 log_file_path.to_string()
165 };
166
167 println!("📊 BrightData metrics logger initialized with file: {}", final_path);
168
169 Self {
170 call_counter: AtomicU64::new(0),
171 calls: Arc::new(Mutex::new(Vec::new())),
172 service_counters: Arc::new(Mutex::new(HashMap::new())),
173 session_counters: Arc::new(Mutex::new(HashMap::new())),
174 log_file_path: final_path,
175 }
176 }
177
178 pub async fn log_call(
180 &self,
181 execution_id: &str,
182 url: &str,
183 zone: &str,
184 format: &str,
185 data_format: Option<&str>,
186 payload: Value,
187 status_code: u16,
188 response_headers: HashMap<String, String>,
189 raw_content: &str,
190 filtered_content: Option<&str>,
191 duration_ms: u64,
192 anthropic_request_id: Option<&str>,
193 mcp_session_id: Option<&str>,
194 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
195 let sequence = self.call_counter.fetch_add(1, Ordering::SeqCst) + 1;
196 let service = BrightDataService::from(zone);
197
198 if let Some(session_id) = mcp_session_id {
200 let mut session_counters = self.session_counters.lock().unwrap();
201 session_counters.entry(session_id.to_string())
202 .or_insert_with(|| AtomicU64::new(0))
203 .fetch_add(1, Ordering::SeqCst);
204 }
205
206 let content_quality_score = if raw_content.len() > 0 {
208 crate::filters::ResponseFilter::get_content_quality_score(raw_content)
209 } else {
210 0
211 };
212
213 let contains_financial_data = if raw_content.len() > 0 {
214 crate::filters::ResponseFilter::contains_financial_data(raw_content)
215 } else {
216 false
217 };
218
219 let is_navigation_heavy = if raw_content.len() > 0 {
220 crate::filters::ResponseFilter::is_mostly_navigation(raw_content)
221 } else {
222 false
223 };
224
225 let is_error_page = if raw_content.len() > 0 {
226 crate::filters::ResponseFilter::is_error_page(raw_content)
227 } else {
228 status_code >= 400
229 };
230
231 let raw_data_size_kb = raw_content.len() as f64 / 1024.0;
233 let filtered_data_size_kb = filtered_content
234 .map(|c| c.len() as f64 / 1024.0)
235 .unwrap_or(raw_data_size_kb);
236
237 let (truncated, truncation_reason) = if let Some(filtered) = filtered_content {
239 if filtered.len() < raw_content.len() {
240 let reason = if filtered.contains("...") {
241 Some("Size limit exceeded".to_string())
242 } else if filtered.len() < raw_content.len() / 2 {
243 Some("Navigation/boilerplate removed".to_string())
244 } else {
245 Some("Content filtered".to_string())
246 };
247 (true, reason)
248 } else {
249 (false, None)
250 }
251 } else {
252 (false, None)
253 };
254
255 let config = BrightDataConfig {
257 zone: zone.to_string(),
258 format: format.to_string(),
259 data_format: data_format.map(|s| s.to_string()),
260 timeout_seconds: 90,
261 viewport: payload.get("viewport").cloned(),
262 custom_headers: None,
263 full_page: payload.get("full_page").and_then(|v| v.as_bool()),
264 };
265
266 let call = BrightDataCall {
267 call_id: execution_id.to_string(),
268 timestamp: Utc::now(),
269 service: service.clone(),
270 sequence_number: sequence,
271 anthropic_request_id: anthropic_request_id.map(|s| s.to_string()),
272 mcp_session_id: mcp_session_id.map(|s| s.to_string()),
273
274 url: url.to_string(),
275 query: payload.get("query").and_then(|v| v.as_str()).map(|s| s.to_string())
276 .or_else(|| {
277 if url.contains("search?") || url.contains("quote/") {
278 url.split('/').last().map(|s| s.to_string())
279 } else {
280 None
281 }
282 }),
283 config,
284
285 status_code,
286 response_headers,
287 data_format: DataFormat::from(data_format.unwrap_or(format)),
288 raw_data_size_kb,
289 filtered_data_size_kb,
290 truncated,
291 truncation_reason,
292
293 raw_response_data: Some(raw_content.to_string()),
294 filtered_response_data: filtered_content.map(|s| s.to_string()),
295
296 duration_ms,
297 success: status_code >= 200 && status_code < 400,
298 error_message: if status_code >= 400 {
299 Some(format!("HTTP {}", status_code))
300 } else {
301 None
302 },
303
304 content_quality_score,
305 contains_financial_data,
306 is_navigation_heavy,
307 is_error_page,
308 };
309
310 {
312 let mut calls = self.calls.lock().unwrap();
313 calls.push(call.clone());
314 }
315
316 {
318 let mut counters = self.service_counters.lock().unwrap();
319 counters.entry(service).or_insert_with(|| AtomicU64::new(0))
320 .fetch_add(1, Ordering::SeqCst);
321 }
322
323 if let Err(e) = self.write_call_to_log(&call).await {
325 eprintln!("Warning: Failed to write to metrics log file '{}': {}", self.log_file_path, e);
326 }
328
329 Ok(())
330 }
331
332 pub fn get_service_call_count(&self, service: &BrightDataService) -> u64 {
334 let counters = self.service_counters.lock().unwrap();
335 counters.get(service)
336 .map(|counter| counter.load(Ordering::SeqCst))
337 .unwrap_or(0)
338 }
339
340 pub fn get_total_call_count(&self) -> u64 {
342 self.call_counter.load(Ordering::SeqCst)
343 }
344
345 pub fn get_session_call_count(&self, session_id: &str) -> u64 {
347 let session_counters = self.session_counters.lock().unwrap();
348 session_counters.get(session_id)
349 .map(|counter| counter.load(Ordering::SeqCst))
350 .unwrap_or(0)
351 }
352
353 pub fn get_all_sessions(&self) -> HashMap<String, u64> {
355 let session_counters = self.session_counters.lock().unwrap();
356 session_counters.iter()
357 .map(|(session_id, counter)| (session_id.clone(), counter.load(Ordering::SeqCst)))
358 .collect()
359 }
360
361 pub fn get_calls_for_session(&self, session_id: &str) -> Vec<BrightDataCall> {
363 let calls = self.calls.lock().unwrap();
364 calls.iter()
365 .filter(|call| call.mcp_session_id.as_deref() == Some(session_id))
366 .cloned()
367 .collect()
368 }
369
370 pub fn get_service_metrics(&self) -> HashMap<BrightDataService, ServiceMetrics> {
372 let calls = self.calls.lock().unwrap();
373 let mut metrics: HashMap<BrightDataService, ServiceMetrics> = HashMap::new();
374
375 for call in calls.iter() {
376 let entry = metrics.entry(call.service.clone()).or_insert_with(|| ServiceMetrics {
377 service: call.service.clone(),
378 total_calls: 0,
379 successful_calls: 0,
380 failed_calls: 0,
381 total_data_kb: 0.0,
382 average_data_size_kb: 0.0,
383 total_duration_ms: 0,
384 average_duration_ms: 0.0,
385 most_used_format: DataFormat::Raw,
386 most_used_zone: String::new(),
387 truncation_rate: 0.0,
388 unique_sessions: 0,
389 });
390
391 entry.total_calls += 1;
392 if call.success {
393 entry.successful_calls += 1;
394 } else {
395 entry.failed_calls += 1;
396 }
397 entry.total_data_kb += call.filtered_data_size_kb;
398 entry.total_duration_ms += call.duration_ms;
399 }
400
401 for (service, metric) in metrics.iter_mut() {
403 if metric.total_calls > 0 {
404 metric.average_data_size_kb = metric.total_data_kb / metric.total_calls as f64;
405 metric.average_duration_ms = metric.total_duration_ms as f64 / metric.total_calls as f64;
406
407 let service_calls: Vec<_> = calls.iter()
408 .filter(|c| &c.service == service)
409 .collect();
410
411 let mut format_counts: HashMap<String, u32> = HashMap::new();
412 let mut zone_counts: HashMap<String, u32> = HashMap::new();
413 let mut truncated_count = 0;
414 let mut unique_sessions = std::collections::HashSet::new();
415
416 for call in service_calls.iter() {
417 *format_counts.entry(format!("{:?}", call.data_format)).or_insert(0) += 1;
418 *zone_counts.entry(call.config.zone.clone()).or_insert(0) += 1;
419 if call.truncated {
420 truncated_count += 1;
421 }
422 if let Some(session_id) = &call.mcp_session_id {
423 unique_sessions.insert(session_id.clone());
424 }
425 }
426
427 if let Some((most_format, _)) = format_counts.iter().max_by_key(|(_, &count)| count) {
428 metric.most_used_format = DataFormat::Unknown(most_format.clone());
429 }
430
431 if let Some((most_zone, _)) = zone_counts.iter().max_by_key(|(_, &count)| count) {
432 metric.most_used_zone = most_zone.clone();
433 }
434
435 metric.truncation_rate = (truncated_count as f64 / service_calls.len() as f64) * 100.0;
436 metric.unique_sessions = unique_sessions.len() as u64;
437 }
438 }
439
440 metrics
441 }
442
443 pub fn get_calls_by_sequence(&self) -> Vec<BrightDataCall> {
445 let mut calls = self.calls.lock().unwrap().clone();
446 calls.sort_by_key(|c| c.sequence_number);
447 calls
448 }
449
450 pub async fn mark_new_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
452 log::info!("📊 Marking new MCP session: {}", session_id);
453
454 {
455 let mut session_counters = self.session_counters.lock().unwrap();
456 session_counters.insert(session_id.to_string(), AtomicU64::new(0));
457 }
458
459 self.log_call(
460 &format!("session_marker_{}", session_id),
461 &format!("mcp://session/{}", session_id),
462 "mcp_session",
463 "json",
464 Some("session_start"),
465 serde_json::json!({
466 "event": "session_start",
467 "session_id": session_id,
468 "timestamp": chrono::Utc::now().to_rfc3339()
469 }),
470 200,
471 HashMap::new(),
472 &format!("MCP session {} started", session_id),
473 None,
474 0,
475 None,
476 Some(session_id),
477 ).await?;
478
479 Ok(())
480 }
481
482 async fn write_call_to_log(&self, call: &BrightDataCall) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
484 if let Some(parent) = std::path::Path::new(&self.log_file_path).parent() {
486 tokio::fs::create_dir_all(parent).await?;
487 }
488
489 let mut file = OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(&self.log_file_path)
493 .await?;
494
495 let log_entry = serde_json::to_string(call)?;
496 file.write_all(format!("{}\n", log_entry).as_bytes()).await?;
497 file.flush().await?;
498
499 Ok(())
500 }
501
502 pub async fn generate_metrics_report(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
504 let service_metrics = self.get_service_metrics();
505 let total_calls = self.get_total_call_count();
506 let calls_by_sequence = self.get_calls_by_sequence();
507 let all_sessions = self.get_all_sessions();
508
509 let mut report = String::new();
510 report.push_str("# BrightData Metrics Report\n\n");
511 report.push_str(&format!("Generated: {}\n", Utc::now().format("%Y-%m-%d %H:%M:%S UTC")));
512 report.push_str(&format!("Total Calls: {}\n", total_calls));
513 report.push_str(&format!("Total MCP Sessions: {}\n\n", all_sessions.len()));
514
515 if !all_sessions.is_empty() {
516 report.push_str("## MCP Session Breakdown\n\n");
517 for (session_id, call_count) in all_sessions.iter() {
518 report.push_str(&format!("- Session {}: {} calls\n", session_id, call_count));
519 }
520 report.push_str("\n");
521 }
522
523 report.push_str("## Service Breakdown\n\n");
524 for (service, metrics) in service_metrics.iter() {
525 report.push_str(&format!("### {:?}\n", service));
526 report.push_str(&format!("- Total Calls: {}\n", metrics.total_calls));
527 report.push_str(&format!("- Success Rate: {:.1}%\n",
528 (metrics.successful_calls as f64 / metrics.total_calls as f64) * 100.0));
529 report.push_str(&format!("- Average Data Size: {:.2} KB\n", metrics.average_data_size_kb));
530 report.push_str(&format!("- Average Duration: {:.1} ms\n", metrics.average_duration_ms));
531 report.push_str(&format!("- Most Used Zone: {}\n", metrics.most_used_zone));
532 report.push_str(&format!("- Truncation Rate: {:.1}%\n", metrics.truncation_rate));
533 report.push_str(&format!("- Unique Sessions: {}\n", metrics.unique_sessions));
534 report.push_str("\n");
535 }
536
537 report.push_str("## Call Sequence\n\n");
538 for (i, call) in calls_by_sequence.iter().enumerate() {
539 let session_info = call.mcp_session_id.as_deref().unwrap_or("no-session");
540 report.push_str(&format!("{}. [{:?}] {} - {:.2} KB -> {:.2} KB ({}) [Session: {}]\n",
541 i + 1,
542 call.service,
543 call.url,
544 call.raw_data_size_kb,
545 call.filtered_data_size_kb,
546 if call.success { "✅" } else { "❌" },
547 session_info
548 ));
549 }
550
551 Ok(report)
552 }
553}
554
555lazy_static::lazy_static! {
557 pub static ref BRIGHTDATA_METRICS: BrightDataMetricsLogger = {
558 let log_path = std::env::var("BRIGHTDATA_METRICS_LOG_PATH")
559 .unwrap_or_else(|_| "logs/brightdata_metrics.jsonl".to_string());
560
561 let final_path = if log_path.ends_with('/') || log_path.ends_with('\\') {
563 format!("{}brightdata_metrics.jsonl", log_path)
564 } else if std::path::Path::new(&log_path).is_dir() {
565 format!("{}/brightdata_metrics.jsonl", log_path)
566 } else if !log_path.contains('.') {
567 format!("{}.jsonl", log_path)
568 } else {
569 log_path
570 };
571
572 println!("📊 Initializing BrightData metrics with file: {}", final_path);
573 BrightDataMetricsLogger::new(&final_path)
574 };
575}