1use anyhow::{Context, Result};
13use parking_lot::Mutex;
14use serde::Serialize;
15use std::fs::{File, OpenOptions};
16use std::io::{BufWriter, Write};
17use std::path::Path;
18use std::sync::Arc;
19use tracing::{error, warn};
20
21use sentinel_config::{AuditLogConfig, LoggingConfig};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum AccessLogFormat {
26 Json,
28 Combined,
30}
31
32#[derive(Debug, Serialize)]
34pub struct AccessLogEntry {
35 pub timestamp: String,
37 pub trace_id: String,
39 pub method: String,
41 pub path: String,
43 #[serde(skip_serializing_if = "Option::is_none")]
45 pub query: Option<String>,
46 pub protocol: String,
48 pub status: u16,
50 pub body_bytes: u64,
52 pub duration_ms: u64,
54 pub client_ip: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub user_agent: Option<String>,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub referer: Option<String>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub host: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub route_id: Option<String>,
68 #[serde(skip_serializing_if = "Option::is_none")]
70 pub upstream: Option<String>,
71 pub upstream_attempts: u32,
73 pub instance_id: String,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub namespace: Option<String>,
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub service: Option<String>,
81}
82
83impl AccessLogEntry {
84 pub fn format(&self, format: AccessLogFormat) -> String {
86 match format {
87 AccessLogFormat::Json => self.format_json(),
88 AccessLogFormat::Combined => self.format_combined(),
89 }
90 }
91
92 fn format_json(&self) -> String {
94 serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
95 }
96
97 fn format_combined(&self) -> String {
100 let clf_timestamp = self.format_clf_timestamp();
102
103 let request_line = if let Some(ref query) = self.query {
105 format!("{} {}?{} {}", self.method, self.path, query, self.protocol)
106 } else {
107 format!("{} {} {}", self.method, self.path, self.protocol)
108 };
109
110 let referer = self.referer.as_deref().unwrap_or("-");
112 let user_agent = self.user_agent.as_deref().unwrap_or("-");
113
114 format!(
116 "{} - - [{}] \"{}\" {} {} \"{}\" \"{}\" {} {}ms",
117 self.client_ip,
118 clf_timestamp,
119 request_line,
120 self.status,
121 self.body_bytes,
122 referer,
123 user_agent,
124 self.trace_id,
125 self.duration_ms
126 )
127 }
128
129 fn format_clf_timestamp(&self) -> String {
131 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&self.timestamp) {
133 dt.format("%d/%b/%Y:%H:%M:%S %z").to_string()
134 } else {
135 self.timestamp.clone()
136 }
137 }
138}
139
140#[derive(Debug, Serialize)]
142pub struct ErrorLogEntry {
143 pub timestamp: String,
145 pub trace_id: String,
147 pub level: String,
149 pub message: String,
151 #[serde(skip_serializing_if = "Option::is_none")]
153 pub route_id: Option<String>,
154 #[serde(skip_serializing_if = "Option::is_none")]
156 pub upstream: Option<String>,
157 #[serde(skip_serializing_if = "Option::is_none")]
159 pub details: Option<String>,
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
164#[serde(rename_all = "snake_case")]
165pub enum AuditEventType {
166 Blocked,
168 AgentDecision,
170 WafMatch,
172 WafBlock,
174 RateLimitExceeded,
176 AuthEvent,
178 ConfigChange,
180 CertReload,
182 CircuitBreakerChange,
184 CachePurge,
186 AdminAction,
188 Custom,
190}
191
192impl std::fmt::Display for AuditEventType {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 match self {
195 AuditEventType::Blocked => write!(f, "blocked"),
196 AuditEventType::AgentDecision => write!(f, "agent_decision"),
197 AuditEventType::WafMatch => write!(f, "waf_match"),
198 AuditEventType::WafBlock => write!(f, "waf_block"),
199 AuditEventType::RateLimitExceeded => write!(f, "rate_limit_exceeded"),
200 AuditEventType::AuthEvent => write!(f, "auth_event"),
201 AuditEventType::ConfigChange => write!(f, "config_change"),
202 AuditEventType::CertReload => write!(f, "cert_reload"),
203 AuditEventType::CircuitBreakerChange => write!(f, "circuit_breaker_change"),
204 AuditEventType::CachePurge => write!(f, "cache_purge"),
205 AuditEventType::AdminAction => write!(f, "admin_action"),
206 AuditEventType::Custom => write!(f, "custom"),
207 }
208 }
209}
210
211#[derive(Debug, Serialize)]
213pub struct AuditLogEntry {
214 pub timestamp: String,
216 pub trace_id: String,
218 pub event_type: String,
220 pub method: String,
222 pub path: String,
224 pub client_ip: String,
226 #[serde(skip_serializing_if = "Option::is_none")]
228 pub route_id: Option<String>,
229 #[serde(skip_serializing_if = "Option::is_none")]
231 pub reason: Option<String>,
232 #[serde(skip_serializing_if = "Option::is_none")]
234 pub agent_id: Option<String>,
235 #[serde(skip_serializing_if = "Vec::is_empty")]
237 pub rule_ids: Vec<String>,
238 #[serde(skip_serializing_if = "Vec::is_empty")]
240 pub tags: Vec<String>,
241 #[serde(skip_serializing_if = "Option::is_none")]
243 pub action: Option<String>,
244 #[serde(skip_serializing_if = "Option::is_none")]
246 pub status_code: Option<u16>,
247 #[serde(skip_serializing_if = "Option::is_none")]
249 pub user_id: Option<String>,
250 #[serde(skip_serializing_if = "Option::is_none")]
252 pub session_id: Option<String>,
253 #[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
255 pub metadata: std::collections::HashMap<String, String>,
256 #[serde(skip_serializing_if = "Option::is_none")]
258 pub namespace: Option<String>,
259 #[serde(skip_serializing_if = "Option::is_none")]
261 pub service: Option<String>,
262}
263
264impl AuditLogEntry {
265 pub fn new(
267 trace_id: impl Into<String>,
268 event_type: AuditEventType,
269 method: impl Into<String>,
270 path: impl Into<String>,
271 client_ip: impl Into<String>,
272 ) -> Self {
273 Self {
274 timestamp: chrono::Utc::now().to_rfc3339(),
275 trace_id: trace_id.into(),
276 event_type: event_type.to_string(),
277 method: method.into(),
278 path: path.into(),
279 client_ip: client_ip.into(),
280 route_id: None,
281 reason: None,
282 agent_id: None,
283 rule_ids: Vec::new(),
284 tags: Vec::new(),
285 action: None,
286 status_code: None,
287 user_id: None,
288 session_id: None,
289 metadata: std::collections::HashMap::new(),
290 namespace: None,
291 service: None,
292 }
293 }
294
295 pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
297 self.namespace = Some(namespace.into());
298 self
299 }
300
301 pub fn with_service(mut self, service: impl Into<String>) -> Self {
303 self.service = Some(service.into());
304 self
305 }
306
307 pub fn with_scope(mut self, namespace: Option<String>, service: Option<String>) -> Self {
309 self.namespace = namespace;
310 self.service = service;
311 self
312 }
313
314 pub fn with_route_id(mut self, route_id: impl Into<String>) -> Self {
316 self.route_id = Some(route_id.into());
317 self
318 }
319
320 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
322 self.reason = Some(reason.into());
323 self
324 }
325
326 pub fn with_agent_id(mut self, agent_id: impl Into<String>) -> Self {
328 self.agent_id = Some(agent_id.into());
329 self
330 }
331
332 pub fn with_rule_ids(mut self, rule_ids: Vec<String>) -> Self {
334 self.rule_ids = rule_ids;
335 self
336 }
337
338 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
340 self.tags = tags;
341 self
342 }
343
344 pub fn with_action(mut self, action: impl Into<String>) -> Self {
346 self.action = Some(action.into());
347 self
348 }
349
350 pub fn with_status_code(mut self, status_code: u16) -> Self {
352 self.status_code = Some(status_code);
353 self
354 }
355
356 pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
358 self.user_id = Some(user_id.into());
359 self
360 }
361
362 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
364 self.session_id = Some(session_id.into());
365 self
366 }
367
368 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
370 self.metadata.insert(key.into(), value.into());
371 self
372 }
373
374 pub fn blocked(
376 trace_id: impl Into<String>,
377 method: impl Into<String>,
378 path: impl Into<String>,
379 client_ip: impl Into<String>,
380 reason: impl Into<String>,
381 ) -> Self {
382 Self::new(trace_id, AuditEventType::Blocked, method, path, client_ip)
383 .with_reason(reason)
384 .with_action("block")
385 }
386
387 pub fn rate_limited(
389 trace_id: impl Into<String>,
390 method: impl Into<String>,
391 path: impl Into<String>,
392 client_ip: impl Into<String>,
393 limit_key: impl Into<String>,
394 ) -> Self {
395 Self::new(
396 trace_id,
397 AuditEventType::RateLimitExceeded,
398 method,
399 path,
400 client_ip,
401 )
402 .with_reason("Rate limit exceeded")
403 .with_action("block")
404 .with_metadata("limit_key", limit_key)
405 }
406
407 pub fn waf_blocked(
409 trace_id: impl Into<String>,
410 method: impl Into<String>,
411 path: impl Into<String>,
412 client_ip: impl Into<String>,
413 rule_ids: Vec<String>,
414 ) -> Self {
415 Self::new(trace_id, AuditEventType::WafBlock, method, path, client_ip)
416 .with_rule_ids(rule_ids)
417 .with_action("block")
418 }
419
420 pub fn config_change(
422 trace_id: impl Into<String>,
423 change_type: impl Into<String>,
424 details: impl Into<String>,
425 ) -> Self {
426 Self::new(
427 trace_id,
428 AuditEventType::ConfigChange,
429 "-",
430 "/-/config",
431 "internal",
432 )
433 .with_reason(change_type)
434 .with_metadata("details", details)
435 }
436
437 pub fn cert_reload(
439 trace_id: impl Into<String>,
440 listener_id: impl Into<String>,
441 success: bool,
442 ) -> Self {
443 Self::new(
444 trace_id,
445 AuditEventType::CertReload,
446 "-",
447 "/-/certs",
448 "internal",
449 )
450 .with_metadata("listener_id", listener_id)
451 .with_metadata("success", success.to_string())
452 }
453
454 pub fn cache_purge(
456 trace_id: impl Into<String>,
457 method: impl Into<String>,
458 path: impl Into<String>,
459 client_ip: impl Into<String>,
460 pattern: impl Into<String>,
461 ) -> Self {
462 Self::new(
463 trace_id,
464 AuditEventType::CachePurge,
465 method,
466 path,
467 client_ip,
468 )
469 .with_metadata("pattern", pattern)
470 .with_action("purge")
471 }
472
473 pub fn admin_action(
475 trace_id: impl Into<String>,
476 method: impl Into<String>,
477 path: impl Into<String>,
478 client_ip: impl Into<String>,
479 action: impl Into<String>,
480 ) -> Self {
481 Self::new(
482 trace_id,
483 AuditEventType::AdminAction,
484 method,
485 path,
486 client_ip,
487 )
488 .with_action(action)
489 }
490}
491
492struct LogFileWriter {
494 writer: BufWriter<File>,
495}
496
497impl LogFileWriter {
498 fn new(path: &Path, buffer_size: usize) -> Result<Self> {
499 if let Some(parent) = path.parent() {
501 std::fs::create_dir_all(parent)
502 .with_context(|| format!("Failed to create log directory: {:?}", parent))?;
503 }
504
505 let file = OpenOptions::new()
506 .create(true)
507 .append(true)
508 .open(path)
509 .with_context(|| format!("Failed to open log file: {:?}", path))?;
510
511 Ok(Self {
512 writer: BufWriter::with_capacity(buffer_size, file),
513 })
514 }
515
516 fn write_line(&mut self, line: &str) -> Result<()> {
517 writeln!(self.writer, "{}", line)?;
518 Ok(())
519 }
520
521 fn flush(&mut self) -> Result<()> {
522 self.writer.flush()?;
523 Ok(())
524 }
525}
526
527pub struct LogManager {
529 access_log: Option<Mutex<LogFileWriter>>,
530 access_log_format: AccessLogFormat,
531 error_log: Option<Mutex<LogFileWriter>>,
532 audit_log: Option<Mutex<LogFileWriter>>,
533 audit_config: Option<AuditLogConfig>,
534}
535
536impl LogManager {
537 pub fn new(config: &LoggingConfig) -> Result<Self> {
539 let (access_log, access_log_format) = if let Some(ref access_config) = config.access_log {
540 if access_config.enabled {
541 let format = Self::parse_access_format(&access_config.format);
542 let writer = Mutex::new(LogFileWriter::new(
543 &access_config.file,
544 access_config.buffer_size,
545 )?);
546 (Some(writer), format)
547 } else {
548 (None, AccessLogFormat::Json)
549 }
550 } else {
551 (None, AccessLogFormat::Json)
552 };
553
554 let error_log = if let Some(ref error_config) = config.error_log {
555 if error_config.enabled {
556 Some(Mutex::new(LogFileWriter::new(
557 &error_config.file,
558 error_config.buffer_size,
559 )?))
560 } else {
561 None
562 }
563 } else {
564 None
565 };
566
567 let audit_log = if let Some(ref audit_config) = config.audit_log {
568 if audit_config.enabled {
569 Some(Mutex::new(LogFileWriter::new(
570 &audit_config.file,
571 audit_config.buffer_size,
572 )?))
573 } else {
574 None
575 }
576 } else {
577 None
578 };
579
580 Ok(Self {
581 access_log,
582 access_log_format,
583 error_log,
584 audit_log,
585 audit_config: config.audit_log.clone(),
586 })
587 }
588
589 pub fn disabled() -> Self {
591 Self {
592 access_log: None,
593 access_log_format: AccessLogFormat::Json,
594 error_log: None,
595 audit_log: None,
596 audit_config: None,
597 }
598 }
599
600 fn parse_access_format(format: &str) -> AccessLogFormat {
602 match format.to_lowercase().as_str() {
603 "combined" | "clf" | "common" => AccessLogFormat::Combined,
604 _ => AccessLogFormat::Json, }
606 }
607
608 pub fn log_access(&self, entry: &AccessLogEntry) {
610 if let Some(ref writer) = self.access_log {
611 let formatted = entry.format(self.access_log_format);
612 let mut guard = writer.lock();
613 if let Err(e) = guard.write_line(&formatted) {
614 error!("Failed to write access log: {}", e);
615 }
616 }
617 }
618
619 pub fn log_error(&self, entry: &ErrorLogEntry) {
621 if let Some(ref writer) = self.error_log {
622 match serde_json::to_string(entry) {
623 Ok(json) => {
624 let mut guard = writer.lock();
625 if let Err(e) = guard.write_line(&json) {
626 error!("Failed to write error log: {}", e);
627 }
628 }
629 Err(e) => {
630 error!("Failed to serialize error log entry: {}", e);
631 }
632 }
633 }
634 }
635
636 pub fn log_audit(&self, entry: &AuditLogEntry) {
638 if let Some(ref writer) = self.audit_log {
639 if let Some(ref config) = self.audit_config {
640 let should_log = match entry.event_type.as_str() {
642 "blocked" => config.log_blocked,
643 "agent_decision" => config.log_agent_decisions,
644 "waf_match" | "waf_block" => config.log_waf_events,
645 _ => true, };
647
648 if !should_log {
649 return;
650 }
651 }
652
653 match serde_json::to_string(entry) {
654 Ok(json) => {
655 let mut guard = writer.lock();
656 if let Err(e) = guard.write_line(&json) {
657 error!("Failed to write audit log: {}", e);
658 }
659 }
660 Err(e) => {
661 error!("Failed to serialize audit log entry: {}", e);
662 }
663 }
664 }
665 }
666
667 pub fn flush(&self) {
669 if let Some(ref writer) = self.access_log {
670 if let Err(e) = writer.lock().flush() {
671 warn!("Failed to flush access log: {}", e);
672 }
673 }
674 if let Some(ref writer) = self.error_log {
675 if let Err(e) = writer.lock().flush() {
676 warn!("Failed to flush error log: {}", e);
677 }
678 }
679 if let Some(ref writer) = self.audit_log {
680 if let Err(e) = writer.lock().flush() {
681 warn!("Failed to flush audit log: {}", e);
682 }
683 }
684 }
685
686 pub fn access_log_enabled(&self) -> bool {
688 self.access_log.is_some()
689 }
690
691 pub fn error_log_enabled(&self) -> bool {
693 self.error_log.is_some()
694 }
695
696 pub fn audit_log_enabled(&self) -> bool {
698 self.audit_log.is_some()
699 }
700}
701
702pub type SharedLogManager = Arc<LogManager>;
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708 use sentinel_config::{AccessLogConfig, ErrorLogConfig};
709 use tempfile::tempdir;
710
711 #[test]
712 fn test_access_log_entry_serialization() {
713 let entry = AccessLogEntry {
714 timestamp: "2024-01-01T00:00:00Z".to_string(),
715 trace_id: "abc123".to_string(),
716 method: "GET".to_string(),
717 path: "/api/users".to_string(),
718 query: Some("page=1".to_string()),
719 protocol: "HTTP/1.1".to_string(),
720 status: 200,
721 body_bytes: 1024,
722 duration_ms: 50,
723 client_ip: "192.168.1.1".to_string(),
724 user_agent: Some("Mozilla/5.0".to_string()),
725 referer: None,
726 host: Some("example.com".to_string()),
727 route_id: Some("api-route".to_string()),
728 upstream: Some("backend-1".to_string()),
729 upstream_attempts: 1,
730 instance_id: "instance-1".to_string(),
731 namespace: None,
732 service: None,
733 };
734
735 let json = serde_json::to_string(&entry).unwrap();
736 assert!(json.contains("\"trace_id\":\"abc123\""));
737 assert!(json.contains("\"status\":200"));
738 }
739
740 #[test]
741 fn test_access_log_entry_with_scope() {
742 let entry = AccessLogEntry {
743 timestamp: "2024-01-01T00:00:00Z".to_string(),
744 trace_id: "abc123".to_string(),
745 method: "GET".to_string(),
746 path: "/api/users".to_string(),
747 query: None,
748 protocol: "HTTP/1.1".to_string(),
749 status: 200,
750 body_bytes: 1024,
751 duration_ms: 50,
752 client_ip: "192.168.1.1".to_string(),
753 user_agent: None,
754 referer: None,
755 host: None,
756 route_id: Some("api-route".to_string()),
757 upstream: Some("backend-1".to_string()),
758 upstream_attempts: 1,
759 instance_id: "instance-1".to_string(),
760 namespace: Some("api".to_string()),
761 service: Some("payments".to_string()),
762 };
763
764 let json = serde_json::to_string(&entry).unwrap();
765 assert!(json.contains("\"namespace\":\"api\""));
766 assert!(json.contains("\"service\":\"payments\""));
767 }
768
769 #[test]
770 fn test_log_manager_creation() {
771 let dir = tempdir().unwrap();
772 let access_log_path = dir.path().join("access.log");
773 let error_log_path = dir.path().join("error.log");
774 let audit_log_path = dir.path().join("audit.log");
775
776 let config = LoggingConfig {
777 level: "info".to_string(),
778 format: "json".to_string(),
779 timestamps: true,
780 file: None,
781 access_log: Some(AccessLogConfig {
782 enabled: true,
783 file: access_log_path.clone(),
784 format: "json".to_string(),
785 buffer_size: 8192,
786 include_trace_id: true,
787 }),
788 error_log: Some(ErrorLogConfig {
789 enabled: true,
790 file: error_log_path.clone(),
791 level: "warn".to_string(),
792 buffer_size: 8192,
793 }),
794 audit_log: Some(AuditLogConfig {
795 enabled: true,
796 file: audit_log_path.clone(),
797 buffer_size: 8192,
798 log_blocked: true,
799 log_agent_decisions: true,
800 log_waf_events: true,
801 }),
802 };
803
804 let manager = LogManager::new(&config).unwrap();
805 assert!(manager.access_log_enabled());
806 assert!(manager.error_log_enabled());
807 assert!(manager.audit_log_enabled());
808 }
809
810 #[test]
811 fn test_access_log_combined_format() {
812 let entry = AccessLogEntry {
813 timestamp: "2024-01-15T10:30:00+00:00".to_string(),
814 trace_id: "trace-abc123".to_string(),
815 method: "GET".to_string(),
816 path: "/api/users".to_string(),
817 query: Some("page=1".to_string()),
818 protocol: "HTTP/1.1".to_string(),
819 status: 200,
820 body_bytes: 1024,
821 duration_ms: 50,
822 client_ip: "192.168.1.1".to_string(),
823 user_agent: Some("Mozilla/5.0".to_string()),
824 referer: Some("https://example.com/".to_string()),
825 host: Some("api.example.com".to_string()),
826 route_id: Some("api-route".to_string()),
827 upstream: Some("backend-1".to_string()),
828 upstream_attempts: 1,
829 instance_id: "instance-1".to_string(),
830 namespace: None,
831 service: None,
832 };
833
834 let combined = entry.format(AccessLogFormat::Combined);
835
836 assert!(combined.starts_with("192.168.1.1 - - ["));
838 assert!(combined.contains("\"GET /api/users?page=1 HTTP/1.1\""));
839 assert!(combined.contains(" 200 1024 "));
840 assert!(combined.contains("\"https://example.com/\""));
841 assert!(combined.contains("\"Mozilla/5.0\""));
842 assert!(combined.contains("trace-abc123"));
843 assert!(combined.ends_with("50ms"));
844 }
845
846 #[test]
847 fn test_access_log_format_parsing() {
848 assert_eq!(
849 LogManager::parse_access_format("json"),
850 AccessLogFormat::Json
851 );
852 assert_eq!(
853 LogManager::parse_access_format("JSON"),
854 AccessLogFormat::Json
855 );
856 assert_eq!(
857 LogManager::parse_access_format("combined"),
858 AccessLogFormat::Combined
859 );
860 assert_eq!(
861 LogManager::parse_access_format("COMBINED"),
862 AccessLogFormat::Combined
863 );
864 assert_eq!(
865 LogManager::parse_access_format("clf"),
866 AccessLogFormat::Combined
867 );
868 assert_eq!(
869 LogManager::parse_access_format("unknown"),
870 AccessLogFormat::Json
871 ); }
873}