litellm_rs/utils/logging/
logging.rs

1//! Enhanced logging utilities with structured logging and performance optimizations
2//!
3//! This module provides improved logging capabilities including structured logging,
4//! log sampling, and async logging to minimize performance impact.
5
6use serde::Serialize;
7use std::collections::HashMap;
8use std::sync::OnceLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use tokio::sync::mpsc;
11use tracing::{Level, debug, error, info, warn};
12use uuid::Uuid;
13
14/// Log entry for async processing
15#[derive(Debug, Clone, Serialize)]
16pub struct LogEntry {
17    /// Timestamp
18    pub timestamp: chrono::DateTime<chrono::Utc>,
19    /// Log level
20    pub level: String,
21    /// Logger name/component
22    pub logger: String,
23    /// Log message
24    pub message: String,
25    /// Structured fields
26    pub fields: HashMap<String, serde_json::Value>,
27    /// Request ID for correlation
28    pub request_id: Option<String>,
29    /// User ID if available
30    pub user_id: Option<Uuid>,
31    /// Trace ID for distributed tracing
32    pub trace_id: Option<String>,
33}
34
35/// Async logger configuration
36#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub struct AsyncLoggerConfig {
39    /// Buffer size for log entries
40    pub buffer_size: usize,
41    /// Whether to drop logs on buffer overflow
42    pub drop_on_overflow: bool,
43    /// Sampling rate for high-frequency logs (0.0 to 1.0)
44    pub sample_rate: f64,
45    /// Maximum log message length
46    pub max_message_length: usize,
47}
48
49impl Default for AsyncLoggerConfig {
50    fn default() -> Self {
51        Self {
52            buffer_size: 10000,
53            drop_on_overflow: false,
54            sample_rate: 1.0,
55            max_message_length: 1024,
56        }
57    }
58}
59
60/// Async logger for high-performance logging
61#[allow(dead_code)]
62pub struct AsyncLogger {
63    sender: mpsc::UnboundedSender<LogEntry>,
64    config: AsyncLoggerConfig,
65    sample_counter: AtomicU64,
66}
67
68#[allow(dead_code)]
69impl AsyncLogger {
70    /// Create a new async logger
71    pub fn new(config: AsyncLoggerConfig) -> Self {
72        let (sender, mut receiver) = mpsc::unbounded_channel::<LogEntry>();
73
74        // Spawn background task to process log entries
75        tokio::spawn(async move {
76            while let Some(entry) = receiver.recv().await {
77                Self::process_log_entry(entry).await;
78            }
79        });
80
81        Self {
82            sender,
83            config,
84            sample_counter: AtomicU64::new(0),
85        }
86    }
87
88    /// Log a message with structured fields
89    pub fn log_structured(
90        &self,
91        level: Level,
92        logger: &str,
93        message: &str,
94        fields: HashMap<String, serde_json::Value>,
95        request_id: Option<String>,
96        user_id: Option<Uuid>,
97    ) {
98        // Apply sampling if configured
99        if self.config.sample_rate < 1.0 {
100            let counter = self.sample_counter.fetch_add(1, Ordering::Relaxed);
101            let sample_threshold = (u64::MAX as f64 * self.config.sample_rate) as u64;
102            if counter % (u64::MAX / sample_threshold.max(1)) != 0 {
103                return;
104            }
105        }
106
107        // Truncate message if too long
108        let truncated_message = if message.len() > self.config.max_message_length {
109            format!("{}...", &message[..self.config.max_message_length - 3])
110        } else {
111            message.to_string()
112        };
113
114        let entry = LogEntry {
115            timestamp: chrono::Utc::now(),
116            level: level.to_string(),
117            logger: logger.to_string(),
118            message: truncated_message,
119            fields,
120            request_id,
121            user_id,
122            trace_id: Self::current_trace_id(),
123        };
124
125        if self.sender.send(entry).is_err() {
126            // Channel is closed, log synchronously as fallback
127            error!("Async logger channel closed, falling back to sync logging");
128        }
129    }
130
131    /// Log a simple message
132    pub fn log(&self, level: Level, logger: &str, message: &str) {
133        self.log_structured(level, logger, message, HashMap::new(), None, None);
134    }
135
136    /// Log with request context
137    pub fn log_with_context(
138        &self,
139        level: Level,
140        logger: &str,
141        message: &str,
142        request_id: Option<String>,
143        user_id: Option<Uuid>,
144    ) {
145        self.log_structured(level, logger, message, HashMap::new(), request_id, user_id);
146    }
147
148    /// Process a log entry (background task)
149    async fn process_log_entry(entry: LogEntry) {
150        // In a real implementation, you might:
151        // - Write to files
152        // - Send to external logging services
153        // - Store in databases
154        // - Forward to monitoring systems
155
156        // For now, just output to tracing
157        let level = match entry.level.as_str() {
158            "ERROR" => Level::ERROR,
159            "WARN" => Level::WARN,
160            "INFO" => Level::INFO,
161            "DEBUG" => Level::DEBUG,
162            _ => Level::INFO,
163        };
164
165        match level {
166            Level::ERROR => error!(
167                logger = entry.logger,
168                request_id = entry.request_id,
169                user_id = ?entry.user_id,
170                trace_id = entry.trace_id,
171                fields = ?entry.fields,
172                "{}",
173                entry.message
174            ),
175            Level::WARN => warn!(
176                logger = entry.logger,
177                request_id = entry.request_id,
178                user_id = ?entry.user_id,
179                trace_id = entry.trace_id,
180                fields = ?entry.fields,
181                "{}",
182                entry.message
183            ),
184            Level::INFO => info!(
185                logger = entry.logger,
186                request_id = entry.request_id,
187                user_id = ?entry.user_id,
188                trace_id = entry.trace_id,
189                fields = ?entry.fields,
190                "{}",
191                entry.message
192            ),
193            Level::DEBUG => debug!(
194                logger = entry.logger,
195                request_id = entry.request_id,
196                user_id = ?entry.user_id,
197                trace_id = entry.trace_id,
198                fields = ?entry.fields,
199                "{}",
200                entry.message
201            ),
202            _ => info!(
203                logger = entry.logger,
204                request_id = entry.request_id,
205                user_id = ?entry.user_id,
206                trace_id = entry.trace_id,
207                fields = ?entry.fields,
208                "{}",
209                entry.message
210            ),
211        }
212    }
213
214    /// Get current trace ID from tracing context
215    fn current_trace_id() -> Option<String> {
216        // In a real implementation, extract from tracing span context
217        // For now, return None
218        None
219    }
220}
221
222/// Global async logger instance
223#[allow(dead_code)]
224static ASYNC_LOGGER: OnceLock<AsyncLogger> = OnceLock::new();
225
226/// Initialize the global async logger
227#[allow(dead_code)]
228pub fn init_async_logger(config: AsyncLoggerConfig) {
229    ASYNC_LOGGER.get_or_init(|| AsyncLogger::new(config));
230}
231
232/// Get the global async logger
233#[allow(dead_code)]
234pub fn async_logger() -> Option<&'static AsyncLogger> {
235    ASYNC_LOGGER.get()
236}
237
238/// Log sampling manager for high-frequency events
239#[allow(dead_code)]
240pub struct LogSampler {
241    sample_rates: HashMap<String, f64>,
242    counters: HashMap<String, AtomicU64>,
243}
244
245#[allow(dead_code)]
246impl Default for LogSampler {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl LogSampler {
253    /// Create a new log sampler
254    pub fn new() -> Self {
255        Self {
256            sample_rates: HashMap::new(),
257            counters: HashMap::new(),
258        }
259    }
260
261    /// Configure sampling rate for a log category
262    #[allow(dead_code)]
263    pub fn set_sample_rate(&mut self, category: &str, rate: f64) {
264        self.sample_rates
265            .insert(category.to_string(), rate.clamp(0.0, 1.0));
266        self.counters
267            .insert(category.to_string(), AtomicU64::new(0));
268    }
269
270    /// Check if a log should be sampled
271    #[allow(dead_code)]
272    pub fn should_log(&self, category: &str) -> bool {
273        if let Some(&rate) = self.sample_rates.get(category) {
274            if rate >= 1.0 {
275                return true;
276            }
277            if rate <= 0.0 {
278                return false;
279            }
280
281            if let Some(counter) = self.counters.get(category) {
282                let count = counter.fetch_add(1, Ordering::Relaxed);
283                let sample_threshold = (1.0 / rate) as u64;
284                count % sample_threshold == 0
285            } else {
286                true
287            }
288        } else {
289            true
290        }
291    }
292}
293
294/// Security-aware logging utilities
295#[allow(dead_code)]
296pub struct SecurityLogger;
297
298#[allow(dead_code)]
299impl SecurityLogger {
300    /// Log authentication events
301    pub fn log_auth_event(
302        event_type: &str,
303        user_id: Option<Uuid>,
304        ip_address: Option<&str>,
305        user_agent: Option<&str>,
306        success: bool,
307        details: Option<&str>,
308    ) {
309        let mut fields = HashMap::new();
310        fields.insert(
311            "event_type".to_string(),
312            serde_json::Value::String(event_type.to_string()),
313        );
314        fields.insert("success".to_string(), serde_json::Value::Bool(success));
315
316        if let Some(ip) = ip_address {
317            fields.insert(
318                "ip_address".to_string(),
319                serde_json::Value::String(ip.to_string()),
320            );
321        }
322
323        if let Some(ua) = user_agent {
324            // Truncate user agent to prevent log injection
325            let safe_ua = ua.chars().take(200).collect::<String>();
326            fields.insert("user_agent".to_string(), serde_json::Value::String(safe_ua));
327        }
328
329        if let Some(details) = details {
330            fields.insert(
331                "details".to_string(),
332                serde_json::Value::String(details.to_string()),
333            );
334        }
335
336        let level = if success { Level::INFO } else { Level::WARN };
337        let message = format!(
338            "Authentication {}: {}",
339            if success { "success" } else { "failure" },
340            event_type
341        );
342
343        if let Some(logger) = async_logger() {
344            logger.log_structured(level, "security", &message, fields, None, user_id);
345        }
346    }
347
348    /// Log authorization events
349    pub fn log_authz_event(
350        user_id: Uuid,
351        resource: &str,
352        action: &str,
353        granted: bool,
354        reason: Option<&str>,
355    ) {
356        let mut fields = HashMap::new();
357        fields.insert(
358            "resource".to_string(),
359            serde_json::Value::String(resource.to_string()),
360        );
361        fields.insert(
362            "action".to_string(),
363            serde_json::Value::String(action.to_string()),
364        );
365        fields.insert("granted".to_string(), serde_json::Value::Bool(granted));
366
367        if let Some(reason) = reason {
368            fields.insert(
369                "reason".to_string(),
370                serde_json::Value::String(reason.to_string()),
371            );
372        }
373
374        let level = if granted { Level::DEBUG } else { Level::WARN };
375        let message = format!(
376            "Authorization {}: {} on {}",
377            if granted { "granted" } else { "denied" },
378            action,
379            resource
380        );
381
382        if let Some(logger) = async_logger() {
383            logger.log_structured(level, "security", &message, fields, None, Some(user_id));
384        }
385    }
386
387    /// Log security violations
388    pub fn log_security_violation(
389        violation_type: &str,
390        severity: &str,
391        description: &str,
392        user_id: Option<Uuid>,
393        ip_address: Option<&str>,
394        additional_data: Option<HashMap<String, serde_json::Value>>,
395    ) {
396        let mut fields = HashMap::new();
397        fields.insert(
398            "violation_type".to_string(),
399            serde_json::Value::String(violation_type.to_string()),
400        );
401        fields.insert(
402            "severity".to_string(),
403            serde_json::Value::String(severity.to_string()),
404        );
405
406        if let Some(ip) = ip_address {
407            fields.insert(
408                "ip_address".to_string(),
409                serde_json::Value::String(ip.to_string()),
410            );
411        }
412
413        if let Some(data) = additional_data {
414            for (key, value) in data {
415                fields.insert(key, value);
416            }
417        }
418
419        let level = match severity.to_lowercase().as_str() {
420            "critical" | "high" => Level::ERROR,
421            "medium" => Level::WARN,
422            _ => Level::INFO,
423        };
424
425        if let Some(logger) = async_logger() {
426            logger.log_structured(level, "security", description, fields, None, user_id);
427        }
428    }
429}
430
431/// Request metrics for performance logging
432#[derive(Debug)]
433pub struct RequestMetrics {
434    /// HTTP method (GET, POST, etc.)
435    pub method: String,
436    /// Request path
437    pub path: String,
438    /// HTTP status code
439    pub status_code: u16,
440    /// Request duration in milliseconds
441    pub duration_ms: u64,
442    /// Request size in bytes
443    pub request_size: u64,
444    /// Response size in bytes
445    pub response_size: u64,
446    /// Optional user ID
447    pub user_id: Option<Uuid>,
448    /// Optional request ID for tracing
449    pub request_id: Option<String>,
450}
451
452/// Performance logging utilities
453#[allow(dead_code)]
454pub struct PerformanceLogger;
455
456#[allow(dead_code)]
457impl PerformanceLogger {
458    /// Log request performance metrics
459    pub fn log_request_metrics(metrics: RequestMetrics) {
460        let mut fields = HashMap::new();
461        fields.insert(
462            "method".to_string(),
463            serde_json::Value::String(metrics.method.clone()),
464        );
465        fields.insert(
466            "path".to_string(),
467            serde_json::Value::String(metrics.path.clone()),
468        );
469        fields.insert(
470            "status_code".to_string(),
471            serde_json::Value::Number(metrics.status_code.into()),
472        );
473        fields.insert(
474            "duration_ms".to_string(),
475            serde_json::Value::Number(metrics.duration_ms.into()),
476        );
477        fields.insert(
478            "request_size".to_string(),
479            serde_json::Value::Number(metrics.request_size.into()),
480        );
481        fields.insert(
482            "response_size".to_string(),
483            serde_json::Value::Number(metrics.response_size.into()),
484        );
485
486        let message = format!(
487            "{} {} {} {}ms",
488            metrics.method, metrics.path, metrics.status_code, metrics.duration_ms
489        );
490
491        // Use different log levels based on performance
492        let level = if metrics.duration_ms > 5000 {
493            Level::WARN // Very slow requests
494        } else if metrics.duration_ms > 1000 {
495            Level::INFO // Slow requests
496        } else {
497            Level::DEBUG // Normal requests
498        };
499
500        if let Some(logger) = async_logger() {
501            logger.log_structured(
502                level,
503                "performance",
504                &message,
505                fields,
506                metrics.request_id,
507                metrics.user_id,
508            );
509        }
510    }
511
512    /// Log provider performance metrics
513    pub fn log_provider_metrics(
514        provider: &str,
515        model: &str,
516        duration_ms: u64,
517        token_count: Option<u32>,
518        success: bool,
519        error: Option<&str>,
520    ) {
521        let mut fields = HashMap::new();
522        fields.insert(
523            "provider".to_string(),
524            serde_json::Value::String(provider.to_string()),
525        );
526        fields.insert(
527            "model".to_string(),
528            serde_json::Value::String(model.to_string()),
529        );
530        fields.insert(
531            "duration_ms".to_string(),
532            serde_json::Value::Number(duration_ms.into()),
533        );
534        fields.insert("success".to_string(), serde_json::Value::Bool(success));
535
536        if let Some(tokens) = token_count {
537            fields.insert(
538                "token_count".to_string(),
539                serde_json::Value::Number(tokens.into()),
540            );
541        }
542
543        if let Some(err) = error {
544            fields.insert(
545                "error".to_string(),
546                serde_json::Value::String(err.to_string()),
547            );
548        }
549
550        let level = if success { Level::DEBUG } else { Level::WARN };
551        let message = format!(
552            "Provider {} {} {}ms {}",
553            provider,
554            model,
555            duration_ms,
556            if success { "success" } else { "failed" }
557        );
558
559        if let Some(logger) = async_logger() {
560            logger.log_structured(level, "performance", &message, fields, None, None);
561        }
562    }
563}
564
565/// Convenience macros for structured logging
566#[macro_export]
567macro_rules! log_structured {
568    ($level:expr, $logger:expr, $message:expr, $($key:expr => $value:expr),*) => {
569        {
570            let mut fields = std::collections::HashMap::new();
571            $(
572                fields.insert($key.to_string(), serde_json::to_value($value).unwrap_or(serde_json::Value::Null));
573            )*
574
575            if let Some(logger) = $crate::utils::logging::async_logger() {
576                logger.log_structured($level, $logger, $message, fields, None, None);
577            }
578        }
579    };
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585
586    #[test]
587    fn test_log_sampler() {
588        let mut sampler = LogSampler::new();
589        sampler.set_sample_rate("test", 0.5);
590
591        // Should sample approximately half the logs
592        let mut sampled_count = 0;
593        for _ in 0..1000 {
594            if sampler.should_log("test") {
595                sampled_count += 1;
596            }
597        }
598
599        // Allow some variance due to sampling
600        assert!(sampled_count > 400 && sampled_count < 600);
601    }
602
603    #[test]
604    fn test_async_logger_config() {
605        let config = AsyncLoggerConfig {
606            buffer_size: 5000,
607            drop_on_overflow: true,
608            sample_rate: 0.8,
609            max_message_length: 512,
610        };
611
612        assert_eq!(config.buffer_size, 5000);
613        assert_eq!(config.drop_on_overflow, true);
614        assert_eq!(config.sample_rate, 0.8);
615        assert_eq!(config.max_message_length, 512);
616    }
617
618    #[tokio::test]
619    async fn test_async_logger_creation() {
620        let config = AsyncLoggerConfig::default();
621        let logger = AsyncLogger::new(config);
622
623        // Test basic logging
624        logger.log(Level::INFO, "test", "test message");
625
626        // Give background task time to process
627        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
628    }
629}