1use serde::Serialize;
7use std::collections::HashMap;
8use std::sync::OnceLock;
9use std::sync::atomic::{AtomicU64, Ordering};
10use tokio::sync::mpsc;
11use tracing::{Level, debug, error, info, warn};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize)]
16pub struct LogEntry {
17 pub timestamp: chrono::DateTime<chrono::Utc>,
19 pub level: String,
21 pub logger: String,
23 pub message: String,
25 pub fields: HashMap<String, serde_json::Value>,
27 pub request_id: Option<String>,
29 pub user_id: Option<Uuid>,
31 pub trace_id: Option<String>,
33}
34
35#[derive(Debug, Clone)]
37#[allow(dead_code)]
38pub struct AsyncLoggerConfig {
39 pub buffer_size: usize,
41 pub drop_on_overflow: bool,
43 pub sample_rate: f64,
45 pub max_message_length: usize,
47}
48
49impl Default for AsyncLoggerConfig {
50 fn default() -> Self {
51 Self {
52 buffer_size: 10000,
53 drop_on_overflow: false,
54 sample_rate: 1.0,
55 max_message_length: 1024,
56 }
57 }
58}
59
60#[allow(dead_code)]
62pub struct AsyncLogger {
63 sender: mpsc::UnboundedSender<LogEntry>,
64 config: AsyncLoggerConfig,
65 sample_counter: AtomicU64,
66}
67
68#[allow(dead_code)]
69impl AsyncLogger {
70 pub fn new(config: AsyncLoggerConfig) -> Self {
72 let (sender, mut receiver) = mpsc::unbounded_channel::<LogEntry>();
73
74 tokio::spawn(async move {
76 while let Some(entry) = receiver.recv().await {
77 Self::process_log_entry(entry).await;
78 }
79 });
80
81 Self {
82 sender,
83 config,
84 sample_counter: AtomicU64::new(0),
85 }
86 }
87
88 pub fn log_structured(
90 &self,
91 level: Level,
92 logger: &str,
93 message: &str,
94 fields: HashMap<String, serde_json::Value>,
95 request_id: Option<String>,
96 user_id: Option<Uuid>,
97 ) {
98 if self.config.sample_rate < 1.0 {
100 let counter = self.sample_counter.fetch_add(1, Ordering::Relaxed);
101 let sample_threshold = (u64::MAX as f64 * self.config.sample_rate) as u64;
102 if counter % (u64::MAX / sample_threshold.max(1)) != 0 {
103 return;
104 }
105 }
106
107 let truncated_message = if message.len() > self.config.max_message_length {
109 format!("{}...", &message[..self.config.max_message_length - 3])
110 } else {
111 message.to_string()
112 };
113
114 let entry = LogEntry {
115 timestamp: chrono::Utc::now(),
116 level: level.to_string(),
117 logger: logger.to_string(),
118 message: truncated_message,
119 fields,
120 request_id,
121 user_id,
122 trace_id: Self::current_trace_id(),
123 };
124
125 if self.sender.send(entry).is_err() {
126 error!("Async logger channel closed, falling back to sync logging");
128 }
129 }
130
131 pub fn log(&self, level: Level, logger: &str, message: &str) {
133 self.log_structured(level, logger, message, HashMap::new(), None, None);
134 }
135
136 pub fn log_with_context(
138 &self,
139 level: Level,
140 logger: &str,
141 message: &str,
142 request_id: Option<String>,
143 user_id: Option<Uuid>,
144 ) {
145 self.log_structured(level, logger, message, HashMap::new(), request_id, user_id);
146 }
147
148 async fn process_log_entry(entry: LogEntry) {
150 let level = match entry.level.as_str() {
158 "ERROR" => Level::ERROR,
159 "WARN" => Level::WARN,
160 "INFO" => Level::INFO,
161 "DEBUG" => Level::DEBUG,
162 _ => Level::INFO,
163 };
164
165 match level {
166 Level::ERROR => error!(
167 logger = entry.logger,
168 request_id = entry.request_id,
169 user_id = ?entry.user_id,
170 trace_id = entry.trace_id,
171 fields = ?entry.fields,
172 "{}",
173 entry.message
174 ),
175 Level::WARN => warn!(
176 logger = entry.logger,
177 request_id = entry.request_id,
178 user_id = ?entry.user_id,
179 trace_id = entry.trace_id,
180 fields = ?entry.fields,
181 "{}",
182 entry.message
183 ),
184 Level::INFO => info!(
185 logger = entry.logger,
186 request_id = entry.request_id,
187 user_id = ?entry.user_id,
188 trace_id = entry.trace_id,
189 fields = ?entry.fields,
190 "{}",
191 entry.message
192 ),
193 Level::DEBUG => debug!(
194 logger = entry.logger,
195 request_id = entry.request_id,
196 user_id = ?entry.user_id,
197 trace_id = entry.trace_id,
198 fields = ?entry.fields,
199 "{}",
200 entry.message
201 ),
202 _ => info!(
203 logger = entry.logger,
204 request_id = entry.request_id,
205 user_id = ?entry.user_id,
206 trace_id = entry.trace_id,
207 fields = ?entry.fields,
208 "{}",
209 entry.message
210 ),
211 }
212 }
213
214 fn current_trace_id() -> Option<String> {
216 None
219 }
220}
221
222#[allow(dead_code)]
224static ASYNC_LOGGER: OnceLock<AsyncLogger> = OnceLock::new();
225
226#[allow(dead_code)]
228pub fn init_async_logger(config: AsyncLoggerConfig) {
229 ASYNC_LOGGER.get_or_init(|| AsyncLogger::new(config));
230}
231
232#[allow(dead_code)]
234pub fn async_logger() -> Option<&'static AsyncLogger> {
235 ASYNC_LOGGER.get()
236}
237
238#[allow(dead_code)]
240pub struct LogSampler {
241 sample_rates: HashMap<String, f64>,
242 counters: HashMap<String, AtomicU64>,
243}
244
245#[allow(dead_code)]
246impl Default for LogSampler {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252impl LogSampler {
253 pub fn new() -> Self {
255 Self {
256 sample_rates: HashMap::new(),
257 counters: HashMap::new(),
258 }
259 }
260
261 #[allow(dead_code)]
263 pub fn set_sample_rate(&mut self, category: &str, rate: f64) {
264 self.sample_rates
265 .insert(category.to_string(), rate.clamp(0.0, 1.0));
266 self.counters
267 .insert(category.to_string(), AtomicU64::new(0));
268 }
269
270 #[allow(dead_code)]
272 pub fn should_log(&self, category: &str) -> bool {
273 if let Some(&rate) = self.sample_rates.get(category) {
274 if rate >= 1.0 {
275 return true;
276 }
277 if rate <= 0.0 {
278 return false;
279 }
280
281 if let Some(counter) = self.counters.get(category) {
282 let count = counter.fetch_add(1, Ordering::Relaxed);
283 let sample_threshold = (1.0 / rate) as u64;
284 count % sample_threshold == 0
285 } else {
286 true
287 }
288 } else {
289 true
290 }
291 }
292}
293
294#[allow(dead_code)]
296pub struct SecurityLogger;
297
298#[allow(dead_code)]
299impl SecurityLogger {
300 pub fn log_auth_event(
302 event_type: &str,
303 user_id: Option<Uuid>,
304 ip_address: Option<&str>,
305 user_agent: Option<&str>,
306 success: bool,
307 details: Option<&str>,
308 ) {
309 let mut fields = HashMap::new();
310 fields.insert(
311 "event_type".to_string(),
312 serde_json::Value::String(event_type.to_string()),
313 );
314 fields.insert("success".to_string(), serde_json::Value::Bool(success));
315
316 if let Some(ip) = ip_address {
317 fields.insert(
318 "ip_address".to_string(),
319 serde_json::Value::String(ip.to_string()),
320 );
321 }
322
323 if let Some(ua) = user_agent {
324 let safe_ua = ua.chars().take(200).collect::<String>();
326 fields.insert("user_agent".to_string(), serde_json::Value::String(safe_ua));
327 }
328
329 if let Some(details) = details {
330 fields.insert(
331 "details".to_string(),
332 serde_json::Value::String(details.to_string()),
333 );
334 }
335
336 let level = if success { Level::INFO } else { Level::WARN };
337 let message = format!(
338 "Authentication {}: {}",
339 if success { "success" } else { "failure" },
340 event_type
341 );
342
343 if let Some(logger) = async_logger() {
344 logger.log_structured(level, "security", &message, fields, None, user_id);
345 }
346 }
347
348 pub fn log_authz_event(
350 user_id: Uuid,
351 resource: &str,
352 action: &str,
353 granted: bool,
354 reason: Option<&str>,
355 ) {
356 let mut fields = HashMap::new();
357 fields.insert(
358 "resource".to_string(),
359 serde_json::Value::String(resource.to_string()),
360 );
361 fields.insert(
362 "action".to_string(),
363 serde_json::Value::String(action.to_string()),
364 );
365 fields.insert("granted".to_string(), serde_json::Value::Bool(granted));
366
367 if let Some(reason) = reason {
368 fields.insert(
369 "reason".to_string(),
370 serde_json::Value::String(reason.to_string()),
371 );
372 }
373
374 let level = if granted { Level::DEBUG } else { Level::WARN };
375 let message = format!(
376 "Authorization {}: {} on {}",
377 if granted { "granted" } else { "denied" },
378 action,
379 resource
380 );
381
382 if let Some(logger) = async_logger() {
383 logger.log_structured(level, "security", &message, fields, None, Some(user_id));
384 }
385 }
386
387 pub fn log_security_violation(
389 violation_type: &str,
390 severity: &str,
391 description: &str,
392 user_id: Option<Uuid>,
393 ip_address: Option<&str>,
394 additional_data: Option<HashMap<String, serde_json::Value>>,
395 ) {
396 let mut fields = HashMap::new();
397 fields.insert(
398 "violation_type".to_string(),
399 serde_json::Value::String(violation_type.to_string()),
400 );
401 fields.insert(
402 "severity".to_string(),
403 serde_json::Value::String(severity.to_string()),
404 );
405
406 if let Some(ip) = ip_address {
407 fields.insert(
408 "ip_address".to_string(),
409 serde_json::Value::String(ip.to_string()),
410 );
411 }
412
413 if let Some(data) = additional_data {
414 for (key, value) in data {
415 fields.insert(key, value);
416 }
417 }
418
419 let level = match severity.to_lowercase().as_str() {
420 "critical" | "high" => Level::ERROR,
421 "medium" => Level::WARN,
422 _ => Level::INFO,
423 };
424
425 if let Some(logger) = async_logger() {
426 logger.log_structured(level, "security", description, fields, None, user_id);
427 }
428 }
429}
430
431#[derive(Debug)]
433pub struct RequestMetrics {
434 pub method: String,
436 pub path: String,
438 pub status_code: u16,
440 pub duration_ms: u64,
442 pub request_size: u64,
444 pub response_size: u64,
446 pub user_id: Option<Uuid>,
448 pub request_id: Option<String>,
450}
451
452#[allow(dead_code)]
454pub struct PerformanceLogger;
455
456#[allow(dead_code)]
457impl PerformanceLogger {
458 pub fn log_request_metrics(metrics: RequestMetrics) {
460 let mut fields = HashMap::new();
461 fields.insert(
462 "method".to_string(),
463 serde_json::Value::String(metrics.method.clone()),
464 );
465 fields.insert(
466 "path".to_string(),
467 serde_json::Value::String(metrics.path.clone()),
468 );
469 fields.insert(
470 "status_code".to_string(),
471 serde_json::Value::Number(metrics.status_code.into()),
472 );
473 fields.insert(
474 "duration_ms".to_string(),
475 serde_json::Value::Number(metrics.duration_ms.into()),
476 );
477 fields.insert(
478 "request_size".to_string(),
479 serde_json::Value::Number(metrics.request_size.into()),
480 );
481 fields.insert(
482 "response_size".to_string(),
483 serde_json::Value::Number(metrics.response_size.into()),
484 );
485
486 let message = format!(
487 "{} {} {} {}ms",
488 metrics.method, metrics.path, metrics.status_code, metrics.duration_ms
489 );
490
491 let level = if metrics.duration_ms > 5000 {
493 Level::WARN } else if metrics.duration_ms > 1000 {
495 Level::INFO } else {
497 Level::DEBUG };
499
500 if let Some(logger) = async_logger() {
501 logger.log_structured(
502 level,
503 "performance",
504 &message,
505 fields,
506 metrics.request_id,
507 metrics.user_id,
508 );
509 }
510 }
511
512 pub fn log_provider_metrics(
514 provider: &str,
515 model: &str,
516 duration_ms: u64,
517 token_count: Option<u32>,
518 success: bool,
519 error: Option<&str>,
520 ) {
521 let mut fields = HashMap::new();
522 fields.insert(
523 "provider".to_string(),
524 serde_json::Value::String(provider.to_string()),
525 );
526 fields.insert(
527 "model".to_string(),
528 serde_json::Value::String(model.to_string()),
529 );
530 fields.insert(
531 "duration_ms".to_string(),
532 serde_json::Value::Number(duration_ms.into()),
533 );
534 fields.insert("success".to_string(), serde_json::Value::Bool(success));
535
536 if let Some(tokens) = token_count {
537 fields.insert(
538 "token_count".to_string(),
539 serde_json::Value::Number(tokens.into()),
540 );
541 }
542
543 if let Some(err) = error {
544 fields.insert(
545 "error".to_string(),
546 serde_json::Value::String(err.to_string()),
547 );
548 }
549
550 let level = if success { Level::DEBUG } else { Level::WARN };
551 let message = format!(
552 "Provider {} {} {}ms {}",
553 provider,
554 model,
555 duration_ms,
556 if success { "success" } else { "failed" }
557 );
558
559 if let Some(logger) = async_logger() {
560 logger.log_structured(level, "performance", &message, fields, None, None);
561 }
562 }
563}
564
565#[macro_export]
567macro_rules! log_structured {
568 ($level:expr, $logger:expr, $message:expr, $($key:expr => $value:expr),*) => {
569 {
570 let mut fields = std::collections::HashMap::new();
571 $(
572 fields.insert($key.to_string(), serde_json::to_value($value).unwrap_or(serde_json::Value::Null));
573 )*
574
575 if let Some(logger) = $crate::utils::logging::async_logger() {
576 logger.log_structured($level, $logger, $message, fields, None, None);
577 }
578 }
579 };
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 fn test_log_sampler() {
588 let mut sampler = LogSampler::new();
589 sampler.set_sample_rate("test", 0.5);
590
591 let mut sampled_count = 0;
593 for _ in 0..1000 {
594 if sampler.should_log("test") {
595 sampled_count += 1;
596 }
597 }
598
599 assert!(sampled_count > 400 && sampled_count < 600);
601 }
602
603 #[test]
604 fn test_async_logger_config() {
605 let config = AsyncLoggerConfig {
606 buffer_size: 5000,
607 drop_on_overflow: true,
608 sample_rate: 0.8,
609 max_message_length: 512,
610 };
611
612 assert_eq!(config.buffer_size, 5000);
613 assert_eq!(config.drop_on_overflow, true);
614 assert_eq!(config.sample_rate, 0.8);
615 assert_eq!(config.max_message_length, 512);
616 }
617
618 #[tokio::test]
619 async fn test_async_logger_creation() {
620 let config = AsyncLoggerConfig::default();
621 let logger = AsyncLogger::new(config);
622
623 logger.log(Level::INFO, "test", "test message");
625
626 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
628 }
629}