1use anyhow::{Context, Result};
13use parking_lot::Mutex;
14use serde::Serialize;
15use std::fs::{File, OpenOptions};
16use std::io::{BufWriter, Write};
17use std::path::Path;
18use std::sync::Arc;
19use tracing::{error, warn};
20
21use sentinel_config::{AuditLogConfig, LoggingConfig};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum AccessLogFormat {
26 Json,
28 Combined,
30}
31
32#[derive(Debug, Serialize)]
34pub struct AccessLogEntry {
35 pub timestamp: String,
37 pub trace_id: String,
39 pub method: String,
41 pub path: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub query: Option<String>,
46 pub protocol: String,
48 pub status: u16,
50 pub body_bytes: u64,
52 pub duration_ms: u64,
54 pub client_ip: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub user_agent: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub referer: Option<String>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub host: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub route_id: Option<String>,
68 #[serde(skip_serializing_if = "Option::is_none")]
70 pub upstream: Option<String>,
71 pub upstream_attempts: u32,
73 pub instance_id: String,
75}
76
77impl AccessLogEntry {
78 pub fn format(&self, format: AccessLogFormat) -> String {
80 match format {
81 AccessLogFormat::Json => self.format_json(),
82 AccessLogFormat::Combined => self.format_combined(),
83 }
84 }
85
86 fn format_json(&self) -> String {
88 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
89 }
90
91 fn format_combined(&self) -> String {
94 let clf_timestamp = self.format_clf_timestamp();
96
97 let request_line = if let Some(ref query) = self.query {
99 format!("{} {}?{} {}", self.method, self.path, query, self.protocol)
100 } else {
101 format!("{} {} {}", self.method, self.path, self.protocol)
102 };
103
104 let referer = self.referer.as_deref().unwrap_or("-");
106 let user_agent = self.user_agent.as_deref().unwrap_or("-");
107
108 format!(
110 "{} - - [{}] \"{}\" {} {} \"{}\" \"{}\" {} {}ms",
111 self.client_ip,
112 clf_timestamp,
113 request_line,
114 self.status,
115 self.body_bytes,
116 referer,
117 user_agent,
118 self.trace_id,
119 self.duration_ms
120 )
121 }
122
123 fn format_clf_timestamp(&self) -> String {
125 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&self.timestamp) {
127 dt.format("%d/%b/%Y:%H:%M:%S %z").to_string()
128 } else {
129 self.timestamp.clone()
130 }
131 }
132}
133
134#[derive(Debug, Serialize)]
136pub struct ErrorLogEntry {
137 pub timestamp: String,
139 pub trace_id: String,
141 pub level: String,
143 pub message: String,
145 #[serde(skip_serializing_if = "Option::is_none")]
147 pub route_id: Option<String>,
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub upstream: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
153 pub details: Option<String>,
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
158#[serde(rename_all = "snake_case")]
159pub enum AuditEventType {
160 Blocked,
162 AgentDecision,
164 WafMatch,
166 WafBlock,
168 RateLimitExceeded,
170 AuthEvent,
172 ConfigChange,
174 CertReload,
176 CircuitBreakerChange,
178 CachePurge,
180 AdminAction,
182 Custom,
184}
185
186impl std::fmt::Display for AuditEventType {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 match self {
189 AuditEventType::Blocked => write!(f, "blocked"),
190 AuditEventType::AgentDecision => write!(f, "agent_decision"),
191 AuditEventType::WafMatch => write!(f, "waf_match"),
192 AuditEventType::WafBlock => write!(f, "waf_block"),
193 AuditEventType::RateLimitExceeded => write!(f, "rate_limit_exceeded"),
194 AuditEventType::AuthEvent => write!(f, "auth_event"),
195 AuditEventType::ConfigChange => write!(f, "config_change"),
196 AuditEventType::CertReload => write!(f, "cert_reload"),
197 AuditEventType::CircuitBreakerChange => write!(f, "circuit_breaker_change"),
198 AuditEventType::CachePurge => write!(f, "cache_purge"),
199 AuditEventType::AdminAction => write!(f, "admin_action"),
200 AuditEventType::Custom => write!(f, "custom"),
201 }
202 }
203}
204
205#[derive(Debug, Serialize)]
207pub struct AuditLogEntry {
208 pub timestamp: String,
210 pub trace_id: String,
212 pub event_type: String,
214 pub method: String,
216 pub path: String,
218 pub client_ip: String,
220 #[serde(skip_serializing_if = "Option::is_none")]
222 pub route_id: Option<String>,
223 #[serde(skip_serializing_if = "Option::is_none")]
225 pub reason: Option<String>,
226 #[serde(skip_serializing_if = "Option::is_none")]
228 pub agent_id: Option<String>,
229 #[serde(skip_serializing_if = "Vec::is_empty")]
231 pub rule_ids: Vec<String>,
232 #[serde(skip_serializing_if = "Vec::is_empty")]
234 pub tags: Vec<String>,
235 #[serde(skip_serializing_if = "Option::is_none")]
237 pub action: Option<String>,
238 #[serde(skip_serializing_if = "Option::is_none")]
240 pub status_code: Option<u16>,
241 #[serde(skip_serializing_if = "Option::is_none")]
243 pub user_id: Option<String>,
244 #[serde(skip_serializing_if = "Option::is_none")]
246 pub session_id: Option<String>,
247 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
249 pub metadata: std::collections::HashMap<String, String>,
250}
251
252impl AuditLogEntry {
253 pub fn new(
255 trace_id: impl Into<String>,
256 event_type: AuditEventType,
257 method: impl Into<String>,
258 path: impl Into<String>,
259 client_ip: impl Into<String>,
260 ) -> Self {
261 Self {
262 timestamp: chrono::Utc::now().to_rfc3339(),
263 trace_id: trace_id.into(),
264 event_type: event_type.to_string(),
265 method: method.into(),
266 path: path.into(),
267 client_ip: client_ip.into(),
268 route_id: None,
269 reason: None,
270 agent_id: None,
271 rule_ids: Vec::new(),
272 tags: Vec::new(),
273 action: None,
274 status_code: None,
275 user_id: None,
276 session_id: None,
277 metadata: std::collections::HashMap::new(),
278 }
279 }
280
281 pub fn with_route_id(mut self, route_id: impl Into<String>) -> Self {
283 self.route_id = Some(route_id.into());
284 self
285 }
286
287 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
289 self.reason = Some(reason.into());
290 self
291 }
292
293 pub fn with_agent_id(mut self, agent_id: impl Into<String>) -> Self {
295 self.agent_id = Some(agent_id.into());
296 self
297 }
298
299 pub fn with_rule_ids(mut self, rule_ids: Vec<String>) -> Self {
301 self.rule_ids = rule_ids;
302 self
303 }
304
305 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
307 self.tags = tags;
308 self
309 }
310
311 pub fn with_action(mut self, action: impl Into<String>) -> Self {
313 self.action = Some(action.into());
314 self
315 }
316
317 pub fn with_status_code(mut self, status_code: u16) -> Self {
319 self.status_code = Some(status_code);
320 self
321 }
322
323 pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
325 self.user_id = Some(user_id.into());
326 self
327 }
328
329 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
331 self.session_id = Some(session_id.into());
332 self
333 }
334
335 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
337 self.metadata.insert(key.into(), value.into());
338 self
339 }
340
341 pub fn blocked(
343 trace_id: impl Into<String>,
344 method: impl Into<String>,
345 path: impl Into<String>,
346 client_ip: impl Into<String>,
347 reason: impl Into<String>,
348 ) -> Self {
349 Self::new(trace_id, AuditEventType::Blocked, method, path, client_ip)
350 .with_reason(reason)
351 .with_action("block")
352 }
353
354 pub fn rate_limited(
356 trace_id: impl Into<String>,
357 method: impl Into<String>,
358 path: impl Into<String>,
359 client_ip: impl Into<String>,
360 limit_key: impl Into<String>,
361 ) -> Self {
362 Self::new(
363 trace_id,
364 AuditEventType::RateLimitExceeded,
365 method,
366 path,
367 client_ip,
368 )
369 .with_reason("Rate limit exceeded")
370 .with_action("block")
371 .with_metadata("limit_key", limit_key)
372 }
373
374 pub fn waf_blocked(
376 trace_id: impl Into<String>,
377 method: impl Into<String>,
378 path: impl Into<String>,
379 client_ip: impl Into<String>,
380 rule_ids: Vec<String>,
381 ) -> Self {
382 Self::new(trace_id, AuditEventType::WafBlock, method, path, client_ip)
383 .with_rule_ids(rule_ids)
384 .with_action("block")
385 }
386
387 pub fn config_change(
389 trace_id: impl Into<String>,
390 change_type: impl Into<String>,
391 details: impl Into<String>,
392 ) -> Self {
393 Self::new(
394 trace_id,
395 AuditEventType::ConfigChange,
396 "-",
397 "/-/config",
398 "internal",
399 )
400 .with_reason(change_type)
401 .with_metadata("details", details)
402 }
403
404 pub fn cert_reload(
406 trace_id: impl Into<String>,
407 listener_id: impl Into<String>,
408 success: bool,
409 ) -> Self {
410 Self::new(
411 trace_id,
412 AuditEventType::CertReload,
413 "-",
414 "/-/certs",
415 "internal",
416 )
417 .with_metadata("listener_id", listener_id)
418 .with_metadata("success", success.to_string())
419 }
420
421 pub fn cache_purge(
423 trace_id: impl Into<String>,
424 method: impl Into<String>,
425 path: impl Into<String>,
426 client_ip: impl Into<String>,
427 pattern: impl Into<String>,
428 ) -> Self {
429 Self::new(
430 trace_id,
431 AuditEventType::CachePurge,
432 method,
433 path,
434 client_ip,
435 )
436 .with_metadata("pattern", pattern)
437 .with_action("purge")
438 }
439
440 pub fn admin_action(
442 trace_id: impl Into<String>,
443 method: impl Into<String>,
444 path: impl Into<String>,
445 client_ip: impl Into<String>,
446 action: impl Into<String>,
447 ) -> Self {
448 Self::new(
449 trace_id,
450 AuditEventType::AdminAction,
451 method,
452 path,
453 client_ip,
454 )
455 .with_action(action)
456 }
457}
458
459struct LogFileWriter {
461 writer: BufWriter<File>,
462}
463
464impl LogFileWriter {
465 fn new(path: &Path, buffer_size: usize) -> Result<Self> {
466 if let Some(parent) = path.parent() {
468 std::fs::create_dir_all(parent)
469 .with_context(|| format!("Failed to create log directory: {:?}", parent))?;
470 }
471
472 let file = OpenOptions::new()
473 .create(true)
474 .append(true)
475 .open(path)
476 .with_context(|| format!("Failed to open log file: {:?}", path))?;
477
478 Ok(Self {
479 writer: BufWriter::with_capacity(buffer_size, file),
480 })
481 }
482
483 fn write_line(&mut self, line: &str) -> Result<()> {
484 writeln!(self.writer, "{}", line)?;
485 Ok(())
486 }
487
488 fn flush(&mut self) -> Result<()> {
489 self.writer.flush()?;
490 Ok(())
491 }
492}
493
494pub struct LogManager {
496 access_log: Option<Mutex<LogFileWriter>>,
497 access_log_format: AccessLogFormat,
498 error_log: Option<Mutex<LogFileWriter>>,
499 audit_log: Option<Mutex<LogFileWriter>>,
500 audit_config: Option<AuditLogConfig>,
501}
502
503impl LogManager {
504 pub fn new(config: &LoggingConfig) -> Result<Self> {
506 let (access_log, access_log_format) = if let Some(ref access_config) = config.access_log {
507 if access_config.enabled {
508 let format = Self::parse_access_format(&access_config.format);
509 let writer = Mutex::new(LogFileWriter::new(
510 &access_config.file,
511 access_config.buffer_size,
512 )?);
513 (Some(writer), format)
514 } else {
515 (None, AccessLogFormat::Json)
516 }
517 } else {
518 (None, AccessLogFormat::Json)
519 };
520
521 let error_log = if let Some(ref error_config) = config.error_log {
522 if error_config.enabled {
523 Some(Mutex::new(LogFileWriter::new(
524 &error_config.file,
525 error_config.buffer_size,
526 )?))
527 } else {
528 None
529 }
530 } else {
531 None
532 };
533
534 let audit_log = if let Some(ref audit_config) = config.audit_log {
535 if audit_config.enabled {
536 Some(Mutex::new(LogFileWriter::new(
537 &audit_config.file,
538 audit_config.buffer_size,
539 )?))
540 } else {
541 None
542 }
543 } else {
544 None
545 };
546
547 Ok(Self {
548 access_log,
549 access_log_format,
550 error_log,
551 audit_log,
552 audit_config: config.audit_log.clone(),
553 })
554 }
555
556 pub fn disabled() -> Self {
558 Self {
559 access_log: None,
560 access_log_format: AccessLogFormat::Json,
561 error_log: None,
562 audit_log: None,
563 audit_config: None,
564 }
565 }
566
567 fn parse_access_format(format: &str) -> AccessLogFormat {
569 match format.to_lowercase().as_str() {
570 "combined" | "clf" | "common" => AccessLogFormat::Combined,
571 _ => AccessLogFormat::Json, }
573 }
574
575 pub fn log_access(&self, entry: &AccessLogEntry) {
577 if let Some(ref writer) = self.access_log {
578 let formatted = entry.format(self.access_log_format);
579 let mut guard = writer.lock();
580 if let Err(e) = guard.write_line(&formatted) {
581 error!("Failed to write access log: {}", e);
582 }
583 }
584 }
585
586 pub fn log_error(&self, entry: &ErrorLogEntry) {
588 if let Some(ref writer) = self.error_log {
589 match serde_json::to_string(entry) {
590 Ok(json) => {
591 let mut guard = writer.lock();
592 if let Err(e) = guard.write_line(&json) {
593 error!("Failed to write error log: {}", e);
594 }
595 }
596 Err(e) => {
597 error!("Failed to serialize error log entry: {}", e);
598 }
599 }
600 }
601 }
602
603 pub fn log_audit(&self, entry: &AuditLogEntry) {
605 if let Some(ref writer) = self.audit_log {
606 if let Some(ref config) = self.audit_config {
607 let should_log = match entry.event_type.as_str() {
609 "blocked" => config.log_blocked,
610 "agent_decision" => config.log_agent_decisions,
611 "waf_match" | "waf_block" => config.log_waf_events,
612 _ => true, };
614
615 if !should_log {
616 return;
617 }
618 }
619
620 match serde_json::to_string(entry) {
621 Ok(json) => {
622 let mut guard = writer.lock();
623 if let Err(e) = guard.write_line(&json) {
624 error!("Failed to write audit log: {}", e);
625 }
626 }
627 Err(e) => {
628 error!("Failed to serialize audit log entry: {}", e);
629 }
630 }
631 }
632 }
633
634 pub fn flush(&self) {
636 if let Some(ref writer) = self.access_log {
637 if let Err(e) = writer.lock().flush() {
638 warn!("Failed to flush access log: {}", e);
639 }
640 }
641 if let Some(ref writer) = self.error_log {
642 if let Err(e) = writer.lock().flush() {
643 warn!("Failed to flush error log: {}", e);
644 }
645 }
646 if let Some(ref writer) = self.audit_log {
647 if let Err(e) = writer.lock().flush() {
648 warn!("Failed to flush audit log: {}", e);
649 }
650 }
651 }
652
653 pub fn access_log_enabled(&self) -> bool {
655 self.access_log.is_some()
656 }
657
658 pub fn error_log_enabled(&self) -> bool {
660 self.error_log.is_some()
661 }
662
663 pub fn audit_log_enabled(&self) -> bool {
665 self.audit_log.is_some()
666 }
667}
668
669pub type SharedLogManager = Arc<LogManager>;
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675 use sentinel_config::{AccessLogConfig, ErrorLogConfig};
676 use tempfile::tempdir;
677
678 #[test]
679 fn test_access_log_entry_serialization() {
680 let entry = AccessLogEntry {
681 timestamp: "2024-01-01T00:00:00Z".to_string(),
682 trace_id: "abc123".to_string(),
683 method: "GET".to_string(),
684 path: "/api/users".to_string(),
685 query: Some("page=1".to_string()),
686 protocol: "HTTP/1.1".to_string(),
687 status: 200,
688 body_bytes: 1024,
689 duration_ms: 50,
690 client_ip: "192.168.1.1".to_string(),
691 user_agent: Some("Mozilla/5.0".to_string()),
692 referer: None,
693 host: Some("example.com".to_string()),
694 route_id: Some("api-route".to_string()),
695 upstream: Some("backend-1".to_string()),
696 upstream_attempts: 1,
697 instance_id: "instance-1".to_string(),
698 };
699
700 let json = serde_json::to_string(&entry).unwrap();
701 assert!(json.contains("\"trace_id\":\"abc123\""));
702 assert!(json.contains("\"status\":200"));
703 }
704
705 #[test]
706 fn test_log_manager_creation() {
707 let dir = tempdir().unwrap();
708 let access_log_path = dir.path().join("access.log");
709 let error_log_path = dir.path().join("error.log");
710 let audit_log_path = dir.path().join("audit.log");
711
712 let config = LoggingConfig {
713 level: "info".to_string(),
714 format: "json".to_string(),
715 timestamps: true,
716 file: None,
717 access_log: Some(AccessLogConfig {
718 enabled: true,
719 file: access_log_path.clone(),
720 format: "json".to_string(),
721 buffer_size: 8192,
722 include_trace_id: true,
723 }),
724 error_log: Some(ErrorLogConfig {
725 enabled: true,
726 file: error_log_path.clone(),
727 level: "warn".to_string(),
728 buffer_size: 8192,
729 }),
730 audit_log: Some(AuditLogConfig {
731 enabled: true,
732 file: audit_log_path.clone(),
733 buffer_size: 8192,
734 log_blocked: true,
735 log_agent_decisions: true,
736 log_waf_events: true,
737 }),
738 };
739
740 let manager = LogManager::new(&config).unwrap();
741 assert!(manager.access_log_enabled());
742 assert!(manager.error_log_enabled());
743 assert!(manager.audit_log_enabled());
744 }
745
746 #[test]
747 fn test_access_log_combined_format() {
748 let entry = AccessLogEntry {
749 timestamp: "2024-01-15T10:30:00+00:00".to_string(),
750 trace_id: "trace-abc123".to_string(),
751 method: "GET".to_string(),
752 path: "/api/users".to_string(),
753 query: Some("page=1".to_string()),
754 protocol: "HTTP/1.1".to_string(),
755 status: 200,
756 body_bytes: 1024,
757 duration_ms: 50,
758 client_ip: "192.168.1.1".to_string(),
759 user_agent: Some("Mozilla/5.0".to_string()),
760 referer: Some("https://example.com/".to_string()),
761 host: Some("api.example.com".to_string()),
762 route_id: Some("api-route".to_string()),
763 upstream: Some("backend-1".to_string()),
764 upstream_attempts: 1,
765 instance_id: "instance-1".to_string(),
766 };
767
768 let combined = entry.format(AccessLogFormat::Combined);
769
770 assert!(combined.starts_with("192.168.1.1 - - ["));
772 assert!(combined.contains("\"GET /api/users?page=1 HTTP/1.1\""));
773 assert!(combined.contains(" 200 1024 "));
774 assert!(combined.contains("\"https://example.com/\""));
775 assert!(combined.contains("\"Mozilla/5.0\""));
776 assert!(combined.contains("trace-abc123"));
777 assert!(combined.ends_with("50ms"));
778 }
779
780 #[test]
781 fn test_access_log_format_parsing() {
782 assert_eq!(
783 LogManager::parse_access_format("json"),
784 AccessLogFormat::Json
785 );
786 assert_eq!(
787 LogManager::parse_access_format("JSON"),
788 AccessLogFormat::Json
789 );
790 assert_eq!(
791 LogManager::parse_access_format("combined"),
792 AccessLogFormat::Combined
793 );
794 assert_eq!(
795 LogManager::parse_access_format("COMBINED"),
796 AccessLogFormat::Combined
797 );
798 assert_eq!(
799 LogManager::parse_access_format("clf"),
800 AccessLogFormat::Combined
801 );
802 assert_eq!(
803 LogManager::parse_access_format("unknown"),
804 AccessLogFormat::Json
805 ); }
807}