snm_brightdata_client/metrics/
brightdata_logger.rs

1// src/metrics/brightdata_logger.rs - Complete fixed version with proper path handling
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use tokio::fs::OpenOptions;
8use tokio::io::AsyncWriteExt;
9use chrono::{DateTime, Utc};
10
11/// BrightData service types
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
13pub enum BrightDataService {
14    Crawl,
15    Browse, 
16    SERP,
17    WebUnlocker,
18    McpSession,
19}
20
21impl From<&str> for BrightDataService {
22    fn from(zone: &str) -> Self {
23        match zone.to_lowercase().as_str() {
24            z if z.contains("serp") => BrightDataService::SERP,
25            z if z.contains("crawl") => BrightDataService::Crawl,
26            z if z.contains("browser") => BrightDataService::Browse,
27            z if z.contains("mcp_session") => BrightDataService::McpSession,
28            _ => BrightDataService::WebUnlocker,
29        }
30    }
31}
32
33/// Data formats supported
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub enum DataFormat {
36    Raw,
37    Markdown,
38    JSON,
39    HTML,
40    Screenshot,
41    SessionEvent,
42    Unknown(String),
43}
44
45impl From<&str> for DataFormat {
46    fn from(format: &str) -> Self {
47        match format.to_lowercase().as_str() {
48            "raw" => DataFormat::Raw,
49            "markdown" => DataFormat::Markdown,
50            "json" => DataFormat::JSON,
51            "html" => DataFormat::HTML,
52            "screenshot" => DataFormat::Screenshot,
53            "session_event" => DataFormat::SessionEvent,
54            "session_start" => DataFormat::SessionEvent,
55            _ => DataFormat::Unknown(format.to_string()),
56        }
57    }
58}
59
60/// Configuration used for BrightData calls
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct BrightDataConfig {
63    pub zone: String,
64    pub format: String,
65    pub data_format: Option<String>,
66    pub timeout_seconds: u64,
67    pub viewport: Option<Value>,
68    pub custom_headers: Option<HashMap<String, String>>,
69    pub full_page: Option<bool>,
70}
71
72/// Individual call metrics with MCP session support
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct BrightDataCall {
75    pub call_id: String,
76    pub timestamp: DateTime<Utc>,
77    pub service: BrightDataService,
78    pub sequence_number: u64,
79    pub anthropic_request_id: Option<String>,
80    pub mcp_session_id: Option<String>,
81    
82    // Request details
83    pub url: String,
84    pub query: Option<String>,
85    pub config: BrightDataConfig,
86    
87    // Response details
88    pub status_code: u16,
89    pub response_headers: HashMap<String, String>,
90    pub data_format: DataFormat,
91    pub raw_data_size_kb: f64,
92    pub filtered_data_size_kb: f64,
93    pub truncated: bool,
94    pub truncation_reason: Option<String>,
95    
96    // Response data content
97    pub raw_response_data: Option<String>,
98    pub filtered_response_data: Option<String>,
99    
100    // Performance
101    pub duration_ms: u64,
102    pub success: bool,
103    pub error_message: Option<String>,
104    
105    // Content analysis
106    pub content_quality_score: u8,
107    pub contains_financial_data: bool,
108    pub is_navigation_heavy: bool,
109    pub is_error_page: bool,
110}
111
112/// Aggregated metrics per service with session support
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ServiceMetrics {
115    pub service: BrightDataService,
116    pub total_calls: u64,
117    pub successful_calls: u64,
118    pub failed_calls: u64,
119    pub total_data_kb: f64,
120    pub average_data_size_kb: f64,
121    pub total_duration_ms: u64,
122    pub average_duration_ms: f64,
123    pub most_used_format: DataFormat,
124    pub most_used_zone: String,
125    pub truncation_rate: f64,
126    pub unique_sessions: u64,
127}
128
129/// Main metrics logger with MCP session support and FIXED path handling
130pub struct BrightDataMetricsLogger {
131    call_counter: AtomicU64,
132    calls: Arc<Mutex<Vec<BrightDataCall>>>,
133    service_counters: Arc<Mutex<HashMap<BrightDataService, AtomicU64>>>,
134    session_counters: Arc<Mutex<HashMap<String, AtomicU64>>>,
135    log_file_path: String,
136}
137
138impl BrightDataMetricsLogger {
139    pub fn new(log_file_path: &str) -> Self {
140        // FIXED: Ensure proper file path, not directory path
141        let path = std::path::Path::new(log_file_path);
142        
143        // Create parent directories if they don't exist
144        if let Some(parent) = path.parent() {
145            if let Err(e) = std::fs::create_dir_all(parent) {
146                eprintln!("Warning: Failed to create log directory {:?}: {}", parent, e);
147            }
148        }
149        
150        // FIXED: Ensure the path points to a file, not a directory
151        let final_path = if path.is_dir() || log_file_path.ends_with('/') || log_file_path.ends_with('\\') {
152            // If it's a directory, append default filename
153            let dir_path = if log_file_path.ends_with('/') || log_file_path.ends_with('\\') {
154                log_file_path.trim_end_matches('/').trim_end_matches('\\')
155            } else {
156                log_file_path
157            };
158            format!("{}/brightdata_metrics.jsonl", dir_path)
159        } else if !log_file_path.contains('.') {
160            // If no extension, add .jsonl
161            format!("{}.jsonl", log_file_path)
162        } else {
163            // Already a proper file path
164            log_file_path.to_string()
165        };
166        
167        println!("📊 BrightData metrics logger initialized with file: {}", final_path);
168        
169        Self {
170            call_counter: AtomicU64::new(0),
171            calls: Arc::new(Mutex::new(Vec::new())),
172            service_counters: Arc::new(Mutex::new(HashMap::new())),
173            session_counters: Arc::new(Mutex::new(HashMap::new())),
174            log_file_path: final_path,
175        }
176    }
177    
178    /// Log a BrightData call with full metrics and MCP session support
179    pub async fn log_call(
180        &self,
181        execution_id: &str,
182        url: &str,
183        zone: &str,
184        format: &str,
185        data_format: Option<&str>,
186        payload: Value,
187        status_code: u16,
188        response_headers: HashMap<String, String>,
189        raw_content: &str,
190        filtered_content: Option<&str>,
191        duration_ms: u64,
192        anthropic_request_id: Option<&str>,
193        mcp_session_id: Option<&str>,
194    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
195        let sequence = self.call_counter.fetch_add(1, Ordering::SeqCst) + 1;
196        let service = BrightDataService::from(zone);
197        
198        // Update session counter if session_id provided
199        if let Some(session_id) = mcp_session_id {
200            let mut session_counters = self.session_counters.lock().unwrap();
201            session_counters.entry(session_id.to_string())
202                .or_insert_with(|| AtomicU64::new(0))
203                .fetch_add(1, Ordering::SeqCst);
204        }
205        
206        // Analyze content safely
207        let content_quality_score = if raw_content.len() > 0 {
208            crate::filters::ResponseFilter::get_content_quality_score(raw_content)
209        } else {
210            0
211        };
212        
213        let contains_financial_data = if raw_content.len() > 0 {
214            crate::filters::ResponseFilter::contains_financial_data(raw_content)
215        } else {
216            false
217        };
218        
219        let is_navigation_heavy = if raw_content.len() > 0 {
220            crate::filters::ResponseFilter::is_mostly_navigation(raw_content)
221        } else {
222            false
223        };
224        
225        let is_error_page = if raw_content.len() > 0 {
226            crate::filters::ResponseFilter::is_error_page(raw_content)
227        } else {
228            status_code >= 400
229        };
230        
231        // Calculate sizes
232        let raw_data_size_kb = raw_content.len() as f64 / 1024.0;
233        let filtered_data_size_kb = filtered_content
234            .map(|c| c.len() as f64 / 1024.0)
235            .unwrap_or(raw_data_size_kb);
236        
237        // Determine truncation
238        let (truncated, truncation_reason) = if let Some(filtered) = filtered_content {
239            if filtered.len() < raw_content.len() {
240                let reason = if filtered.contains("...") {
241                    Some("Size limit exceeded".to_string())
242                } else if filtered.len() < raw_content.len() / 2 {
243                    Some("Navigation/boilerplate removed".to_string())
244                } else {
245                    Some("Content filtered".to_string())
246                };
247                (true, reason)
248            } else {
249                (false, None)
250            }
251        } else {
252            (false, None)
253        };
254        
255        // Extract config from payload
256        let config = BrightDataConfig {
257            zone: zone.to_string(),
258            format: format.to_string(),
259            data_format: data_format.map(|s| s.to_string()),
260            timeout_seconds: 90,
261            viewport: payload.get("viewport").cloned(),
262            custom_headers: None,
263            full_page: payload.get("full_page").and_then(|v| v.as_bool()),
264        };
265        
266        let call = BrightDataCall {
267            call_id: execution_id.to_string(),
268            timestamp: Utc::now(),
269            service: service.clone(),
270            sequence_number: sequence,
271            anthropic_request_id: anthropic_request_id.map(|s| s.to_string()),
272            mcp_session_id: mcp_session_id.map(|s| s.to_string()),
273            
274            url: url.to_string(),
275            query: payload.get("query").and_then(|v| v.as_str()).map(|s| s.to_string())
276                .or_else(|| {
277                    if url.contains("search?") || url.contains("quote/") {
278                        url.split('/').last().map(|s| s.to_string())
279                    } else {
280                        None
281                    }
282                }),
283            config,
284            
285            status_code,
286            response_headers,
287            data_format: DataFormat::from(data_format.unwrap_or(format)),
288            raw_data_size_kb,
289            filtered_data_size_kb,
290            truncated,
291            truncation_reason,
292            
293            raw_response_data: Some(raw_content.to_string()),
294            filtered_response_data: filtered_content.map(|s| s.to_string()),
295            
296            duration_ms,
297            success: status_code >= 200 && status_code < 400,
298            error_message: if status_code >= 400 {
299                Some(format!("HTTP {}", status_code))
300            } else {
301                None
302            },
303            
304            content_quality_score,
305            contains_financial_data,
306            is_navigation_heavy,
307            is_error_page,
308        };
309        
310        // Store call
311        {
312            let mut calls = self.calls.lock().unwrap();
313            calls.push(call.clone());
314        }
315        
316        // Update service counter
317        {
318            let mut counters = self.service_counters.lock().unwrap();
319            counters.entry(service).or_insert_with(|| AtomicU64::new(0))
320                .fetch_add(1, Ordering::SeqCst);
321        }
322        
323        // FIXED: Write to log file with proper error handling
324        if let Err(e) = self.write_call_to_log(&call).await {
325            eprintln!("Warning: Failed to write to metrics log file '{}': {}", self.log_file_path, e);
326            // Don't return error, just log warning
327        }
328        
329        Ok(())
330    }
331    
332    /// Get call count for a specific service
333    pub fn get_service_call_count(&self, service: &BrightDataService) -> u64 {
334        let counters = self.service_counters.lock().unwrap();
335        counters.get(service)
336            .map(|counter| counter.load(Ordering::SeqCst))
337            .unwrap_or(0)
338    }
339    
340    /// Get total call count
341    pub fn get_total_call_count(&self) -> u64 {
342        self.call_counter.load(Ordering::SeqCst)
343    }
344    
345    /// Get call count for a specific MCP session
346    pub fn get_session_call_count(&self, session_id: &str) -> u64 {
347        let session_counters = self.session_counters.lock().unwrap();
348        session_counters.get(session_id)
349            .map(|counter| counter.load(Ordering::SeqCst))
350            .unwrap_or(0)
351    }
352    
353    /// Get all session IDs and their call counts
354    pub fn get_all_sessions(&self) -> HashMap<String, u64> {
355        let session_counters = self.session_counters.lock().unwrap();
356        session_counters.iter()
357            .map(|(session_id, counter)| (session_id.clone(), counter.load(Ordering::SeqCst)))
358            .collect()
359    }
360    
361    /// Get calls for a specific MCP session
362    pub fn get_calls_for_session(&self, session_id: &str) -> Vec<BrightDataCall> {
363        let calls = self.calls.lock().unwrap();
364        calls.iter()
365            .filter(|call| call.mcp_session_id.as_deref() == Some(session_id))
366            .cloned()
367            .collect()
368    }
369    
370    /// Get service metrics with session information
371    pub fn get_service_metrics(&self) -> HashMap<BrightDataService, ServiceMetrics> {
372        let calls = self.calls.lock().unwrap();
373        let mut metrics: HashMap<BrightDataService, ServiceMetrics> = HashMap::new();
374        
375        for call in calls.iter() {
376            let entry = metrics.entry(call.service.clone()).or_insert_with(|| ServiceMetrics {
377                service: call.service.clone(),
378                total_calls: 0,
379                successful_calls: 0,
380                failed_calls: 0,
381                total_data_kb: 0.0,
382                average_data_size_kb: 0.0,
383                total_duration_ms: 0,
384                average_duration_ms: 0.0,
385                most_used_format: DataFormat::Raw,
386                most_used_zone: String::new(),
387                truncation_rate: 0.0,
388                unique_sessions: 0,
389            });
390            
391            entry.total_calls += 1;
392            if call.success {
393                entry.successful_calls += 1;
394            } else {
395                entry.failed_calls += 1;
396            }
397            entry.total_data_kb += call.filtered_data_size_kb;
398            entry.total_duration_ms += call.duration_ms;
399        }
400        
401        // Calculate averages and most used values
402        for (service, metric) in metrics.iter_mut() {
403            if metric.total_calls > 0 {
404                metric.average_data_size_kb = metric.total_data_kb / metric.total_calls as f64;
405                metric.average_duration_ms = metric.total_duration_ms as f64 / metric.total_calls as f64;
406                
407                let service_calls: Vec<_> = calls.iter()
408                    .filter(|c| &c.service == service)
409                    .collect();
410                
411                let mut format_counts: HashMap<String, u32> = HashMap::new();
412                let mut zone_counts: HashMap<String, u32> = HashMap::new();
413                let mut truncated_count = 0;
414                let mut unique_sessions = std::collections::HashSet::new();
415                
416                for call in service_calls.iter() {
417                    *format_counts.entry(format!("{:?}", call.data_format)).or_insert(0) += 1;
418                    *zone_counts.entry(call.config.zone.clone()).or_insert(0) += 1;
419                    if call.truncated {
420                        truncated_count += 1;
421                    }
422                    if let Some(session_id) = &call.mcp_session_id {
423                        unique_sessions.insert(session_id.clone());
424                    }
425                }
426                
427                if let Some((most_format, _)) = format_counts.iter().max_by_key(|(_, &count)| count) {
428                    metric.most_used_format = DataFormat::Unknown(most_format.clone());
429                }
430                
431                if let Some((most_zone, _)) = zone_counts.iter().max_by_key(|(_, &count)| count) {
432                    metric.most_used_zone = most_zone.clone();
433                }
434                
435                metric.truncation_rate = (truncated_count as f64 / service_calls.len() as f64) * 100.0;
436                metric.unique_sessions = unique_sessions.len() as u64;
437            }
438        }
439        
440        metrics
441    }
442    
443    /// Get calls in sequence order
444    pub fn get_calls_by_sequence(&self) -> Vec<BrightDataCall> {
445        let mut calls = self.calls.lock().unwrap().clone();
446        calls.sort_by_key(|c| c.sequence_number);
447        calls
448    }
449    
450    /// Mark new MCP session
451    pub async fn mark_new_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
452        log::info!("📊 Marking new MCP session: {}", session_id);
453        
454        {
455            let mut session_counters = self.session_counters.lock().unwrap();
456            session_counters.insert(session_id.to_string(), AtomicU64::new(0));
457        }
458        
459        self.log_call(
460            &format!("session_marker_{}", session_id),
461            &format!("mcp://session/{}", session_id),
462            "mcp_session",
463            "json",
464            Some("session_start"),
465            serde_json::json!({
466                "event": "session_start",
467                "session_id": session_id,
468                "timestamp": chrono::Utc::now().to_rfc3339()
469            }),
470            200,
471            HashMap::new(),
472            &format!("MCP session {} started", session_id),
473            None,
474            0,
475            None,
476            Some(session_id),
477        ).await?;
478        
479        Ok(())
480    }
481    
482    /// FIXED: Write individual call to log file with proper error handling
483    async fn write_call_to_log(&self, call: &BrightDataCall) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
484        // Ensure parent directory exists
485        if let Some(parent) = std::path::Path::new(&self.log_file_path).parent() {
486            tokio::fs::create_dir_all(parent).await?;
487        }
488        
489        let mut file = OpenOptions::new()
490            .create(true)
491            .append(true)
492            .open(&self.log_file_path)
493            .await?;
494        
495        let log_entry = serde_json::to_string(call)?;
496        file.write_all(format!("{}\n", log_entry).as_bytes()).await?;
497        file.flush().await?;
498        
499        Ok(())
500    }
501    
502    /// Generate comprehensive metrics report
503    pub async fn generate_metrics_report(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
504        let service_metrics = self.get_service_metrics();
505        let total_calls = self.get_total_call_count();
506        let calls_by_sequence = self.get_calls_by_sequence();
507        let all_sessions = self.get_all_sessions();
508        
509        let mut report = String::new();
510        report.push_str("# BrightData Metrics Report\n\n");
511        report.push_str(&format!("Generated: {}\n", Utc::now().format("%Y-%m-%d %H:%M:%S UTC")));
512        report.push_str(&format!("Total Calls: {}\n", total_calls));
513        report.push_str(&format!("Total MCP Sessions: {}\n\n", all_sessions.len()));
514        
515        if !all_sessions.is_empty() {
516            report.push_str("## MCP Session Breakdown\n\n");
517            for (session_id, call_count) in all_sessions.iter() {
518                report.push_str(&format!("- Session {}: {} calls\n", session_id, call_count));
519            }
520            report.push_str("\n");
521        }
522        
523        report.push_str("## Service Breakdown\n\n");
524        for (service, metrics) in service_metrics.iter() {
525            report.push_str(&format!("### {:?}\n", service));
526            report.push_str(&format!("- Total Calls: {}\n", metrics.total_calls));
527            report.push_str(&format!("- Success Rate: {:.1}%\n", 
528                (metrics.successful_calls as f64 / metrics.total_calls as f64) * 100.0));
529            report.push_str(&format!("- Average Data Size: {:.2} KB\n", metrics.average_data_size_kb));
530            report.push_str(&format!("- Average Duration: {:.1} ms\n", metrics.average_duration_ms));
531            report.push_str(&format!("- Most Used Zone: {}\n", metrics.most_used_zone));
532            report.push_str(&format!("- Truncation Rate: {:.1}%\n", metrics.truncation_rate));
533            report.push_str(&format!("- Unique Sessions: {}\n", metrics.unique_sessions));
534            report.push_str("\n");
535        }
536        
537        report.push_str("## Call Sequence\n\n");
538        for (i, call) in calls_by_sequence.iter().enumerate() {
539            let session_info = call.mcp_session_id.as_deref().unwrap_or("no-session");
540            report.push_str(&format!("{}. [{:?}] {} - {:.2} KB -> {:.2} KB ({}) [Session: {}]\n",
541                i + 1,
542                call.service,
543                call.url,
544                call.raw_data_size_kb,
545                call.filtered_data_size_kb,
546                if call.success { "✅" } else { "❌" },
547                session_info
548            ));
549        }
550        
551        Ok(report)
552    }
553}
554
555// FIXED: Global instance with proper path handling
556lazy_static::lazy_static! {
557    pub static ref BRIGHTDATA_METRICS: BrightDataMetricsLogger = {
558        let log_path = std::env::var("BRIGHTDATA_METRICS_LOG_PATH")
559            .unwrap_or_else(|_| "logs/brightdata_metrics.jsonl".to_string());
560        
561        // Ensure we have a proper file path, not a directory
562        let final_path = if log_path.ends_with('/') || log_path.ends_with('\\') {
563            format!("{}brightdata_metrics.jsonl", log_path)
564        } else if std::path::Path::new(&log_path).is_dir() {
565            format!("{}/brightdata_metrics.jsonl", log_path)
566        } else if !log_path.contains('.') {
567            format!("{}.jsonl", log_path)
568        } else {
569            log_path
570        };
571        
572        println!("📊 Initializing BrightData metrics with file: {}", final_path);
573        BrightDataMetricsLogger::new(&final_path)
574    };
575}