use anyhow::{Context, Result};
use parking_lot::Mutex;
use serde::Serialize;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::Path;
use std::sync::Arc;
use tracing::{error, warn};
use zentinel_config::{AuditLogConfig, LoggingConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AccessLogFormat {
Json,
Combined,
}
#[derive(Debug, Serialize)]
pub struct AccessLogEntry {
pub timestamp: String,
pub trace_id: String,
pub method: String,
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
pub protocol: String,
pub status: u16,
pub body_bytes: u64,
pub duration_ms: u64,
pub client_ip: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub referer: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub route_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub upstream: Option<String>,
pub upstream_attempts: u32,
pub instance_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service: Option<String>,
pub body_bytes_sent: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub upstream_addr: Option<String>,
pub connection_reused: bool,
pub rate_limit_hit: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub geo_country: Option<String>,
}
impl AccessLogEntry {
pub fn format(
&self,
format: AccessLogFormat,
fields: Option<&zentinel_config::AccessLogFields>,
) -> String {
match format {
AccessLogFormat::Json => self.format_json(fields),
AccessLogFormat::Combined => self.format_combined(),
}
}
fn format_json(&self, fields: Option<&zentinel_config::AccessLogFields>) -> String {
let fields = match fields {
Some(f) => f,
None => return serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string()),
};
let mut map = serde_json::Map::new();
if fields.timestamp {
map.insert(
"timestamp".to_string(),
serde_json::Value::String(self.timestamp.clone()),
);
}
if fields.trace_id {
map.insert(
"trace_id".to_string(),
serde_json::Value::String(self.trace_id.clone()),
);
}
if fields.method {
map.insert(
"method".to_string(),
serde_json::Value::String(self.method.clone()),
);
}
if fields.path {
map.insert(
"path".to_string(),
serde_json::Value::String(self.path.clone()),
);
}
if fields.query {
if let Some(ref q) = self.query {
map.insert("query".to_string(), serde_json::Value::String(q.clone()));
}
}
if fields.status {
map.insert("status".to_string(), serde_json::json!(self.status));
}
if fields.latency_ms {
map.insert(
"duration_ms".to_string(),
serde_json::json!(self.duration_ms),
);
}
if fields.client_ip {
map.insert(
"client_ip".to_string(),
serde_json::Value::String(self.client_ip.clone()),
);
}
if fields.user_agent {
if let Some(ref ua) = self.user_agent {
map.insert(
"user_agent".to_string(),
serde_json::Value::String(ua.clone()),
);
}
}
if fields.referer {
if let Some(ref r) = self.referer {
map.insert("referer".to_string(), serde_json::Value::String(r.clone()));
}
}
if fields.body_bytes_sent {
map.insert(
"body_bytes_sent".to_string(),
serde_json::json!(self.body_bytes_sent),
);
}
if fields.upstream_addr {
if let Some(ref addr) = self.upstream_addr {
map.insert(
"upstream_addr".to_string(),
serde_json::Value::String(addr.clone()),
);
}
}
if fields.connection_reused {
map.insert(
"connection_reused".to_string(),
serde_json::json!(self.connection_reused),
);
}
if fields.rate_limit_hit {
map.insert(
"rate_limit_hit".to_string(),
serde_json::json!(self.rate_limit_hit),
);
}
if fields.geo_country {
if let Some(ref cc) = self.geo_country {
map.insert(
"geo_country".to_string(),
serde_json::Value::String(cc.clone()),
);
}
}
if let Some(ref route) = self.route_id {
map.insert(
"route_id".to_string(),
serde_json::Value::String(route.clone()),
);
}
if let Some(ref upstream) = self.upstream {
map.insert(
"upstream".to_string(),
serde_json::Value::String(upstream.clone()),
);
}
map.insert(
"instance_id".to_string(),
serde_json::Value::String(self.instance_id.clone()),
);
serde_json::to_string(&map).unwrap_or_else(|_| "{}".to_string())
}
fn format_combined(&self) -> String {
let clf_timestamp = self.format_clf_timestamp();
let request_line = if let Some(ref query) = self.query {
format!("{} {}?{} {}", self.method, self.path, query, self.protocol)
} else {
format!("{} {} {}", self.method, self.path, self.protocol)
};
let referer = self.referer.as_deref().unwrap_or("-");
let user_agent = self.user_agent.as_deref().unwrap_or("-");
format!(
"{} - - [{}] \"{}\" {} {} \"{}\" \"{}\" {} {}ms",
self.client_ip,
clf_timestamp,
request_line,
self.status,
self.body_bytes,
referer,
user_agent,
self.trace_id,
self.duration_ms
)
}
fn format_clf_timestamp(&self) -> String {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&self.timestamp) {
dt.format("%d/%b/%Y:%H:%M:%S %z").to_string()
} else {
self.timestamp.clone()
}
}
}
#[derive(Debug, Serialize)]
pub struct ErrorLogEntry {
pub timestamp: String,
pub trace_id: String,
pub level: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub route_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub upstream: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditEventType {
Blocked,
AgentDecision,
WafMatch,
WafBlock,
RateLimitExceeded,
AuthEvent,
ConfigChange,
CertReload,
CircuitBreakerChange,
CachePurge,
AdminAction,
Custom,
}
impl std::fmt::Display for AuditEventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuditEventType::Blocked => write!(f, "blocked"),
AuditEventType::AgentDecision => write!(f, "agent_decision"),
AuditEventType::WafMatch => write!(f, "waf_match"),
AuditEventType::WafBlock => write!(f, "waf_block"),
AuditEventType::RateLimitExceeded => write!(f, "rate_limit_exceeded"),
AuditEventType::AuthEvent => write!(f, "auth_event"),
AuditEventType::ConfigChange => write!(f, "config_change"),
AuditEventType::CertReload => write!(f, "cert_reload"),
AuditEventType::CircuitBreakerChange => write!(f, "circuit_breaker_change"),
AuditEventType::CachePurge => write!(f, "cache_purge"),
AuditEventType::AdminAction => write!(f, "admin_action"),
AuditEventType::Custom => write!(f, "custom"),
}
}
}
#[derive(Debug, Serialize)]
pub struct AuditLogEntry {
pub timestamp: String,
pub trace_id: String,
pub event_type: String,
pub method: String,
pub path: String,
pub client_ip: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub route_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub rule_ids: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status_code: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(skip_serializing_if = "std::collections::HashMap::is_empty")]
pub metadata: std::collections::HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub service: Option<String>,
}
impl AuditLogEntry {
pub fn new(
trace_id: impl Into<String>,
event_type: AuditEventType,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
) -> Self {
Self {
timestamp: chrono::Utc::now().to_rfc3339(),
trace_id: trace_id.into(),
event_type: event_type.to_string(),
method: method.into(),
path: path.into(),
client_ip: client_ip.into(),
route_id: None,
reason: None,
agent_id: None,
rule_ids: Vec::new(),
tags: Vec::new(),
action: None,
status_code: None,
user_id: None,
session_id: None,
metadata: std::collections::HashMap::new(),
namespace: None,
service: None,
}
}
pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespace = Some(namespace.into());
self
}
pub fn with_service(mut self, service: impl Into<String>) -> Self {
self.service = Some(service.into());
self
}
pub fn with_scope(mut self, namespace: Option<String>, service: Option<String>) -> Self {
self.namespace = namespace;
self.service = service;
self
}
pub fn with_route_id(mut self, route_id: impl Into<String>) -> Self {
self.route_id = Some(route_id.into());
self
}
pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
self.reason = Some(reason.into());
self
}
pub fn with_agent_id(mut self, agent_id: impl Into<String>) -> Self {
self.agent_id = Some(agent_id.into());
self
}
pub fn with_rule_ids(mut self, rule_ids: Vec<String>) -> Self {
self.rule_ids = rule_ids;
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn with_action(mut self, action: impl Into<String>) -> Self {
self.action = Some(action.into());
self
}
pub fn with_status_code(mut self, status_code: u16) -> Self {
self.status_code = Some(status_code);
self
}
pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
self.user_id = Some(user_id.into());
self
}
pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn blocked(
trace_id: impl Into<String>,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
reason: impl Into<String>,
) -> Self {
Self::new(trace_id, AuditEventType::Blocked, method, path, client_ip)
.with_reason(reason)
.with_action("block")
}
pub fn rate_limited(
trace_id: impl Into<String>,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
limit_key: impl Into<String>,
) -> Self {
Self::new(
trace_id,
AuditEventType::RateLimitExceeded,
method,
path,
client_ip,
)
.with_reason("Rate limit exceeded")
.with_action("block")
.with_metadata("limit_key", limit_key)
}
pub fn waf_blocked(
trace_id: impl Into<String>,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
rule_ids: Vec<String>,
) -> Self {
Self::new(trace_id, AuditEventType::WafBlock, method, path, client_ip)
.with_rule_ids(rule_ids)
.with_action("block")
}
pub fn config_change(
trace_id: impl Into<String>,
change_type: impl Into<String>,
details: impl Into<String>,
) -> Self {
Self::new(
trace_id,
AuditEventType::ConfigChange,
"-",
"/-/config",
"internal",
)
.with_reason(change_type)
.with_metadata("details", details)
}
pub fn cert_reload(
trace_id: impl Into<String>,
listener_id: impl Into<String>,
success: bool,
) -> Self {
Self::new(
trace_id,
AuditEventType::CertReload,
"-",
"/-/certs",
"internal",
)
.with_metadata("listener_id", listener_id)
.with_metadata("success", success.to_string())
}
pub fn cache_purge(
trace_id: impl Into<String>,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
pattern: impl Into<String>,
) -> Self {
Self::new(
trace_id,
AuditEventType::CachePurge,
method,
path,
client_ip,
)
.with_metadata("pattern", pattern)
.with_action("purge")
}
pub fn admin_action(
trace_id: impl Into<String>,
method: impl Into<String>,
path: impl Into<String>,
client_ip: impl Into<String>,
action: impl Into<String>,
) -> Self {
Self::new(
trace_id,
AuditEventType::AdminAction,
method,
path,
client_ip,
)
.with_action(action)
}
}
struct LogFileWriter {
writer: BufWriter<File>,
}
impl LogFileWriter {
fn new(path: &Path, buffer_size: usize) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create log directory: {:?}", parent))?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.with_context(|| format!("Failed to open log file: {:?}", path))?;
Ok(Self {
writer: BufWriter::with_capacity(buffer_size, file),
})
}
fn write_line(&mut self, line: &str) -> Result<()> {
writeln!(self.writer, "{}", line)?;
Ok(())
}
fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
Ok(())
}
}
pub struct LogManager {
access_log: Option<Mutex<LogFileWriter>>,
access_log_format: AccessLogFormat,
access_log_config: Option<zentinel_config::AccessLogConfig>,
error_log: Option<Mutex<LogFileWriter>>,
error_log_level: String,
audit_log: Option<Mutex<LogFileWriter>>,
audit_config: Option<AuditLogConfig>,
}
impl LogManager {
pub fn new(config: &LoggingConfig) -> Result<Self> {
let (access_log, access_log_format) = if let Some(ref access_config) = config.access_log {
if access_config.enabled {
let format = Self::parse_access_format(&access_config.format);
let writer = Mutex::new(LogFileWriter::new(
&access_config.file,
access_config.buffer_size,
)?);
(Some(writer), format)
} else {
(None, AccessLogFormat::Json)
}
} else {
(None, AccessLogFormat::Json)
};
let (error_log, error_log_level) = if let Some(ref error_config) = config.error_log {
if error_config.enabled {
(
Some(Mutex::new(LogFileWriter::new(
&error_config.file,
error_config.buffer_size,
)?)),
error_config.level.clone(),
)
} else {
(None, "warn".to_string())
}
} else {
(None, "warn".to_string())
};
let audit_log = if let Some(ref audit_config) = config.audit_log {
if audit_config.enabled {
Some(Mutex::new(LogFileWriter::new(
&audit_config.file,
audit_config.buffer_size,
)?))
} else {
None
}
} else {
None
};
Ok(Self {
access_log,
access_log_format,
access_log_config: config.access_log.clone(),
error_log,
error_log_level,
audit_log,
audit_config: config.audit_log.clone(),
})
}
pub fn disabled() -> Self {
Self {
access_log: None,
access_log_format: AccessLogFormat::Json,
access_log_config: None,
error_log: None,
error_log_level: "warn".to_string(),
audit_log: None,
audit_config: None,
}
}
fn parse_access_format(format: &str) -> AccessLogFormat {
match format.to_lowercase().as_str() {
"combined" | "clf" | "common" => AccessLogFormat::Combined,
_ => AccessLogFormat::Json, }
}
pub fn should_log_access(&self, status: u16) -> bool {
if self.access_log.is_none() {
return false;
}
if let Some(ref config) = self.access_log_config {
if config.sample_errors_always && status >= 400 {
return true;
}
use rand::RngExt;
let mut rng = rand::rng();
return rng.random::<f64>() < config.sample_rate;
}
true
}
pub fn log_access(&self, entry: &AccessLogEntry) {
if let Some(ref writer) = self.access_log {
let fields = self.access_log_config.as_ref().map(|c| &c.fields);
let formatted = entry.format(self.access_log_format, fields);
let mut guard = writer.lock();
if let Err(e) = guard.write_line(&formatted) {
error!("Failed to write access log: {}", e);
}
}
}
pub fn log_error(&self, entry: &ErrorLogEntry) {
if let Some(ref writer) = self.error_log {
match serde_json::to_string(entry) {
Ok(json) => {
let mut guard = writer.lock();
if let Err(e) = guard.write_line(&json) {
error!("Failed to write error log: {}", e);
}
}
Err(e) => {
error!("Failed to serialize error log entry: {}", e);
}
}
}
}
pub fn log_audit(&self, entry: &AuditLogEntry) {
if let Some(ref writer) = self.audit_log {
if let Some(ref config) = self.audit_config {
let should_log = match entry.event_type.as_str() {
"blocked" => config.log_blocked,
"agent_decision" => config.log_agent_decisions,
"waf_match" | "waf_block" => config.log_waf_events,
_ => true, };
if !should_log {
return;
}
}
match serde_json::to_string(entry) {
Ok(json) => {
let mut guard = writer.lock();
if let Err(e) = guard.write_line(&json) {
error!("Failed to write audit log: {}", e);
}
}
Err(e) => {
error!("Failed to serialize audit log entry: {}", e);
}
}
}
}
pub fn flush(&self) {
if let Some(ref writer) = self.access_log {
if let Err(e) = writer.lock().flush() {
warn!("Failed to flush access log: {}", e);
}
}
if let Some(ref writer) = self.error_log {
if let Err(e) = writer.lock().flush() {
warn!("Failed to flush error log: {}", e);
}
}
if let Some(ref writer) = self.audit_log {
if let Err(e) = writer.lock().flush() {
warn!("Failed to flush audit log: {}", e);
}
}
}
pub fn access_log_enabled(&self) -> bool {
self.access_log.is_some()
}
pub fn error_log_enabled(&self) -> bool {
self.error_log.is_some()
}
pub fn audit_log_enabled(&self) -> bool {
self.audit_log.is_some()
}
fn should_log_error_level(&self, level: &str) -> bool {
match self.error_log_level.as_str() {
"error" => level == "error",
_ => level == "warn" || level == "error", }
}
pub fn log_request_error(
&self,
level: &str,
message: &str,
trace_id: &str,
route_id: Option<&str>,
upstream: Option<&str>,
details: Option<String>,
) {
if self.error_log.is_none() || !self.should_log_error_level(level) {
return;
}
let entry = ErrorLogEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
trace_id: trace_id.to_string(),
level: level.to_string(),
message: message.to_string(),
route_id: route_id.map(|s| s.to_string()),
upstream: upstream.map(|s| s.to_string()),
details,
};
self.log_error(&entry);
}
}
impl Drop for LogManager {
fn drop(&mut self) {
self.flush();
}
}
pub type SharedLogManager = Arc<LogManager>;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use zentinel_config::{AccessLogConfig, ErrorLogConfig};
#[test]
fn test_access_log_entry_serialization() {
let entry = AccessLogEntry {
timestamp: "2024-01-01T00:00:00Z".to_string(),
trace_id: "abc123".to_string(),
method: "GET".to_string(),
path: "/api/users".to_string(),
query: Some("page=1".to_string()),
protocol: "HTTP/1.1".to_string(),
status: 200,
body_bytes: 1024,
duration_ms: 50,
client_ip: "192.168.1.1".to_string(),
user_agent: Some("Mozilla/5.0".to_string()),
referer: None,
host: Some("example.com".to_string()),
route_id: Some("api-route".to_string()),
upstream: Some("backend-1".to_string()),
upstream_attempts: 1,
instance_id: "instance-1".to_string(),
namespace: None,
service: None,
body_bytes_sent: 2048,
upstream_addr: Some("10.0.1.5:8080".to_string()),
connection_reused: true,
rate_limit_hit: false,
geo_country: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"trace_id\":\"abc123\""));
assert!(json.contains("\"status\":200"));
}
#[test]
fn test_access_log_entry_with_scope() {
let entry = AccessLogEntry {
timestamp: "2024-01-01T00:00:00Z".to_string(),
trace_id: "abc123".to_string(),
method: "GET".to_string(),
path: "/api/users".to_string(),
query: None,
protocol: "HTTP/1.1".to_string(),
status: 200,
body_bytes: 1024,
duration_ms: 50,
client_ip: "192.168.1.1".to_string(),
user_agent: None,
referer: None,
host: None,
route_id: Some("api-route".to_string()),
upstream: Some("backend-1".to_string()),
upstream_attempts: 1,
instance_id: "instance-1".to_string(),
namespace: Some("api".to_string()),
service: Some("payments".to_string()),
body_bytes_sent: 2048,
upstream_addr: None,
connection_reused: false,
rate_limit_hit: false,
geo_country: Some("US".to_string()),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"namespace\":\"api\""));
assert!(json.contains("\"service\":\"payments\""));
}
#[test]
fn test_log_manager_creation() {
let dir = tempdir().unwrap();
let access_log_path = dir.path().join("access.log");
let error_log_path = dir.path().join("error.log");
let audit_log_path = dir.path().join("audit.log");
let config = LoggingConfig {
level: "info".to_string(),
format: "json".to_string(),
timestamps: true,
file: None,
access_log: Some(AccessLogConfig {
enabled: true,
file: access_log_path.clone(),
format: "json".to_string(),
buffer_size: 8192,
include_trace_id: true,
sample_rate: 1.0,
sample_errors_always: true,
fields: zentinel_config::AccessLogFields::default(),
}),
error_log: Some(ErrorLogConfig {
enabled: true,
file: error_log_path.clone(),
level: "warn".to_string(),
buffer_size: 8192,
}),
audit_log: Some(AuditLogConfig {
enabled: true,
file: audit_log_path.clone(),
buffer_size: 8192,
log_blocked: true,
log_agent_decisions: true,
log_waf_events: true,
}),
};
let manager = LogManager::new(&config).unwrap();
assert!(manager.access_log_enabled());
assert!(manager.error_log_enabled());
assert!(manager.audit_log_enabled());
}
#[test]
fn test_access_log_combined_format() {
let entry = AccessLogEntry {
timestamp: "2024-01-15T10:30:00+00:00".to_string(),
trace_id: "trace-abc123".to_string(),
method: "GET".to_string(),
path: "/api/users".to_string(),
query: Some("page=1".to_string()),
protocol: "HTTP/1.1".to_string(),
status: 200,
body_bytes: 1024,
duration_ms: 50,
client_ip: "192.168.1.1".to_string(),
user_agent: Some("Mozilla/5.0".to_string()),
referer: Some("https://example.com/".to_string()),
host: Some("api.example.com".to_string()),
route_id: Some("api-route".to_string()),
upstream: Some("backend-1".to_string()),
upstream_attempts: 1,
instance_id: "instance-1".to_string(),
namespace: None,
service: None,
body_bytes_sent: 2048,
upstream_addr: Some("10.0.1.5:8080".to_string()),
connection_reused: true,
rate_limit_hit: false,
geo_country: Some("US".to_string()),
};
let combined = entry.format(AccessLogFormat::Combined, None);
assert!(combined.starts_with("192.168.1.1 - - ["));
assert!(combined.contains("\"GET /api/users?page=1 HTTP/1.1\""));
assert!(combined.contains(" 200 1024 "));
assert!(combined.contains("\"https://example.com/\""));
assert!(combined.contains("\"Mozilla/5.0\""));
assert!(combined.contains("trace-abc123"));
assert!(combined.ends_with("50ms"));
}
#[test]
fn test_access_log_format_parsing() {
assert_eq!(
LogManager::parse_access_format("json"),
AccessLogFormat::Json
);
assert_eq!(
LogManager::parse_access_format("JSON"),
AccessLogFormat::Json
);
assert_eq!(
LogManager::parse_access_format("combined"),
AccessLogFormat::Combined
);
assert_eq!(
LogManager::parse_access_format("COMBINED"),
AccessLogFormat::Combined
);
assert_eq!(
LogManager::parse_access_format("clf"),
AccessLogFormat::Combined
);
assert_eq!(
LogManager::parse_access_format("unknown"),
AccessLogFormat::Json
); }
fn test_entry() -> AccessLogEntry {
AccessLogEntry {
timestamp: "2026-03-01T12:00:00Z".to_string(),
trace_id: "trace-xyz".to_string(),
method: "POST".to_string(),
path: "/api/submit".to_string(),
query: Some("v=2".to_string()),
protocol: "HTTP/2.0".to_string(),
status: 201,
body_bytes: 512,
duration_ms: 25,
client_ip: "10.0.0.1".to_string(),
user_agent: Some("curl/8.0".to_string()),
referer: Some("https://example.com".to_string()),
host: Some("api.example.com".to_string()),
route_id: Some("submit-route".to_string()),
upstream: Some("backend-2".to_string()),
upstream_attempts: 1,
instance_id: "inst-1".to_string(),
namespace: None,
service: None,
body_bytes_sent: 100,
upstream_addr: Some("10.1.0.5:9090".to_string()),
connection_reused: false,
rate_limit_hit: true,
geo_country: Some("DE".to_string()),
}
}
#[test]
fn test_json_format_with_all_fields_enabled() {
let entry = test_entry();
let fields = zentinel_config::AccessLogFields::default();
let json_str = entry.format(AccessLogFormat::Json, Some(&fields));
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(parsed["method"], "POST");
assert_eq!(parsed["status"], 201);
assert_eq!(parsed["client_ip"], "10.0.0.1");
assert_eq!(parsed["duration_ms"], 25);
assert_eq!(parsed["upstream_addr"], "10.1.0.5:9090");
assert_eq!(parsed["rate_limit_hit"], true);
}
#[test]
fn test_json_format_with_fields_disabled() {
let entry = test_entry();
let fields = zentinel_config::AccessLogFields {
timestamp: true,
method: true,
path: true,
status: true,
trace_id: false,
query: false,
latency_ms: false,
body_bytes_sent: false,
upstream_addr: false,
connection_reused: false,
rate_limit_hit: false,
geo_country: false,
user_agent: false,
referer: false,
client_ip: false,
};
let json_str = entry.format(AccessLogFormat::Json, Some(&fields));
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.get("timestamp").is_some());
assert!(parsed.get("method").is_some());
assert!(parsed.get("path").is_some());
assert!(parsed.get("status").is_some());
assert!(
parsed.get("trace_id").is_none(),
"trace_id should be filtered out"
);
assert!(
parsed.get("query").is_none(),
"query should be filtered out"
);
assert!(
parsed.get("duration_ms").is_none(),
"duration_ms should be filtered out"
);
assert!(
parsed.get("client_ip").is_none(),
"client_ip should be filtered out"
);
assert!(
parsed.get("user_agent").is_none(),
"user_agent should be filtered out"
);
assert!(
parsed.get("referer").is_none(),
"referer should be filtered out"
);
assert!(
parsed.get("upstream_addr").is_none(),
"upstream_addr should be filtered out"
);
assert!(
parsed.get("rate_limit_hit").is_none(),
"rate_limit_hit should be filtered out"
);
}
#[test]
fn test_json_format_without_field_filter() {
let entry = test_entry();
let json_str = entry.format(AccessLogFormat::Json, None);
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.get("timestamp").is_some());
assert!(parsed.get("trace_id").is_some());
assert!(parsed.get("method").is_some());
assert!(parsed.get("status").is_some());
assert!(parsed.get("duration_ms").is_some());
}
#[test]
fn test_combined_format_with_missing_optional_fields() {
let entry = AccessLogEntry {
timestamp: "2026-03-01T12:00:00Z".to_string(),
trace_id: "trace-001".to_string(),
method: "GET".to_string(),
path: "/health".to_string(),
query: None,
protocol: "HTTP/1.1".to_string(),
status: 200,
body_bytes: 0,
duration_ms: 1,
client_ip: "127.0.0.1".to_string(),
user_agent: None,
referer: None,
host: None,
route_id: None,
upstream: None,
upstream_attempts: 0,
instance_id: "inst-1".to_string(),
namespace: None,
service: None,
body_bytes_sent: 0,
upstream_addr: None,
connection_reused: false,
rate_limit_hit: false,
geo_country: None,
};
let combined = entry.format(AccessLogFormat::Combined, None);
assert!(combined.starts_with("127.0.0.1 - - ["));
assert!(combined.contains("\"GET /health HTTP/1.1\""));
assert!(combined.contains(" 200 0 "));
assert!(combined.contains("\"-\""));
}
#[test]
fn test_json_skips_none_optional_fields() {
let entry = AccessLogEntry {
timestamp: "2026-03-01T12:00:00Z".to_string(),
trace_id: "trace-002".to_string(),
method: "GET".to_string(),
path: "/".to_string(),
query: None,
protocol: "HTTP/1.1".to_string(),
status: 200,
body_bytes: 0,
duration_ms: 0,
client_ip: "127.0.0.1".to_string(),
user_agent: None,
referer: None,
host: None,
route_id: None,
upstream: None,
upstream_attempts: 0,
instance_id: "inst-1".to_string(),
namespace: None,
service: None,
body_bytes_sent: 0,
upstream_addr: None,
connection_reused: false,
rate_limit_hit: false,
geo_country: None,
};
let json_str = entry.format(AccessLogFormat::Json, None);
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.get("query").is_none());
assert!(parsed.get("user_agent").is_none());
assert!(parsed.get("referer").is_none());
assert!(parsed.get("host").is_none());
assert!(parsed.get("route_id").is_none());
assert!(parsed.get("upstream").is_none());
assert!(parsed.get("namespace").is_none());
assert!(parsed.get("service").is_none());
assert!(parsed.get("upstream_addr").is_none());
}
}