pub mod config;
pub mod filter;
pub mod forwarding;
pub mod security;
mod types;
pub use config::{AuditConfig, AuditConfigBuilder};
pub use filter::AuditFilter;
pub use forwarding::{ForwardDestination, SyslogProtocol};
pub use security::{SecurityEvent, SecurityEventDetector};
pub use types::{AuditAction, AuditError, AuditEvent, AuditOutcome, AuditResult, SecuritySeverity};
use std::path::Path;
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
pub struct AuditLogger {
config: AuditConfig,
last_hash: Arc<RwLock<Option<String>>>,
current_file: Arc<RwLock<Option<File>>>,
current_size: Arc<RwLock<u64>>,
security_detector: Arc<SecurityEventDetector>,
s3_client: Option<Arc<aws_sdk_s3::Client>>,
}
impl AuditLogger {
pub async fn new(config: AuditConfig) -> AuditResult<Self> {
if let Some(parent) = config.log_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&config.log_path)
.await?;
let current_size = tokio::fs::metadata(&config.log_path).await?.len();
let last_hash = Self::read_last_hash(&config.log_path, &config.hmac_secret).await?;
let security_detector = Arc::new(SecurityEventDetector::new());
let s3_client = if config
.forward_destinations
.iter()
.any(|d| matches!(d, ForwardDestination::S3 { .. }))
{
let aws_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.load()
.await;
Some(Arc::new(aws_sdk_s3::Client::new(&aws_config)))
} else {
None
};
info!(path = ?config.log_path, s3_enabled = s3_client.is_some(), "Audit logger initialized");
Ok(Self {
config,
last_hash: Arc::new(RwLock::new(last_hash)),
current_file: Arc::new(RwLock::new(Some(file))),
current_size: Arc::new(RwLock::new(current_size)),
security_detector,
s3_client,
})
}
async fn read_last_hash(path: &Path, secret: &[u8]) -> AuditResult<Option<String>> {
if !path.exists() {
return Ok(None);
}
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut last_hash = None;
while let Ok(Some(line)) = lines.next_line().await {
if let Ok(event) = serde_json::from_str::<AuditEvent>(&line) {
let computed_hash = event.compute_hash(secret)?;
last_hash = Some(computed_hash);
}
}
Ok(last_hash)
}
pub async fn log(&self, mut event: AuditEvent) -> AuditResult<()> {
let prev_hash = self.last_hash.read().await.clone();
if let Some(prev) = prev_hash {
event.prev_hash = Some(prev);
}
let hash = event.compute_hash(&self.config.hmac_secret)?;
event.current_hash = Some(hash.clone());
*self.last_hash.write().await = Some(hash);
if self.config.enable_security_detection {
if let Some(security_event) = self.security_detector.detect(&event).await {
warn!(?security_event, "Security event detected");
let mut security_log = AuditEvent::new(
"system".to_string(),
AuditAction::SuspiciousActivity,
format!("security_event:{}", security_event.event_type),
AuditOutcome::Success,
);
security_log.metadata.insert(
"severity".to_string(),
format!("{:?}", security_event.severity),
);
security_log
.metadata
.insert("description".to_string(), security_event.description);
self.write_event(&security_log).await?;
}
}
self.write_event(&event).await?;
if self.config.enable_forwarding {
for dest in &self.config.forward_destinations {
if let Err(e) = self.forward_event(&event, dest).await {
error!(?dest, error = ?e, "Failed to forward audit event");
}
}
}
Ok(())
}
async fn write_event(&self, event: &AuditEvent) -> AuditResult<()> {
let json = serde_json::to_string(event)?;
let line = format!("{}\n", json);
let line_bytes = line.as_bytes();
let current_size = *self.current_size.read().await;
if current_size + line_bytes.len() as u64 > self.config.max_file_size {
self.rotate_log().await?;
}
let mut file_guard = self.current_file.write().await;
if let Some(file) = file_guard.as_mut() {
file.write_all(line_bytes).await?;
file.flush().await?;
}
*self.current_size.write().await += line_bytes.len() as u64;
Ok(())
}
async fn rotate_log(&self) -> AuditResult<()> {
info!("Rotating audit log");
*self.current_file.write().await = None;
let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
let rotated_path = self.config.log_path.with_file_name(format!(
"{}.{}.log",
self.config
.log_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("audit"),
timestamp
));
tokio::fs::rename(&self.config.log_path, &rotated_path).await?;
if self.config.compress_rotated {
let compressed_path = rotated_path.with_extension("log.zst");
if let Err(e) = compress_file(&rotated_path, &compressed_path).await {
error!(error = ?e, "Failed to compress rotated log");
} else {
let _ = tokio::fs::remove_file(&rotated_path).await;
}
}
self.cleanup_old_logs().await?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.config.log_path)
.await?;
*self.current_file.write().await = Some(file);
*self.current_size.write().await = 0;
Ok(())
}
async fn cleanup_old_logs(&self) -> AuditResult<()> {
let parent = self
.config
.log_path
.parent()
.ok_or_else(|| AuditError::InvalidEvent("Invalid log path".to_string()))?;
let file_stem = self
.config
.log_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("audit");
let mut entries = tokio::fs::read_dir(parent).await?;
let mut rotated_files = Vec::new();
while let Ok(Some(entry)) = entries.next_entry().await {
let path = entry.path();
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if name.starts_with(file_stem)
&& name
!= self
.config
.log_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("")
{
if let Ok(metadata) = entry.metadata().await {
rotated_files.push((path, metadata.modified().ok()));
}
}
}
}
rotated_files.sort_by_key(|b| std::cmp::Reverse(b.1));
for (path, _) in rotated_files.iter().skip(self.config.max_rotated_files) {
debug!(?path, "Removing old audit log");
let _ = tokio::fs::remove_file(path).await;
}
Ok(())
}
pub fn format_syslog_rfc5424(&self, event: &AuditEvent) -> AuditResult<String> {
let severity = if event.outcome == AuditOutcome::Failure {
4
} else {
6
};
let facility = 16; let priority = facility * 8 + severity;
let version = 1;
let timestamp = event.timestamp.to_rfc3339();
let hostname = hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or_else(|| "-".to_string());
let app_name = "rs3gw";
let procid = std::process::id();
let msgid = format!("{:?}", event.action).replace(' ', "_");
let structured_data = format!(
"[audit@rs3gw actor=\"{}\" resource=\"{}\" outcome=\"{:?}\" ip=\"{}\"]",
event.actor.replace('"', "\\\""),
event.resource.replace('"', "\\\""),
event.outcome,
event.source_ip.as_ref().unwrap_or(&"-".to_string())
);
let msg = serde_json::to_string(event)
.map_err(AuditError::Serialization)?
.replace('\n', " ");
let syslog_msg = format!(
"<{}>{} {} {} {} {} {} {} {}",
priority, version, timestamp, hostname, app_name, procid, msgid, structured_data, msg
);
Ok(syslog_msg)
}
async fn forward_event(
&self,
event: &AuditEvent,
dest: &ForwardDestination,
) -> AuditResult<()> {
match dest {
ForwardDestination::Webhook { url, headers } => {
let client = reqwest::Client::new();
let mut request = client.post(url).json(event);
for (key, value) in headers {
request = request.header(key, value);
}
request
.send()
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
}
ForwardDestination::File { path } => {
let json = serde_json::to_string(event)?;
let line = format!("{}\n", json);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
file.write_all(line.as_bytes()).await?;
file.flush().await?;
}
ForwardDestination::Syslog {
host,
port,
protocol,
} => {
let syslog_msg = self.format_syslog_rfc5424(event)?;
match protocol {
SyslogProtocol::Udp => {
use tokio::net::UdpSocket;
let socket = UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
socket
.send_to(syslog_msg.as_bytes(), format!("{}:{}", host, port))
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
}
SyslogProtocol::Tcp => {
use tokio::io::AsyncWriteExt as _;
use tokio::net::TcpStream;
let mut stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
let framed = format!("{} {}", syslog_msg.len(), syslog_msg);
stream
.write_all(framed.as_bytes())
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
stream
.flush()
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
}
SyslogProtocol::Tls => {
warn!("TLS syslog not yet implemented, falling back to TCP");
use tokio::io::AsyncWriteExt as _;
use tokio::net::TcpStream;
let mut stream = TcpStream::connect(format!("{}:{}", host, port))
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
let framed = format!("{} {}", syslog_msg.len(), syslog_msg);
stream
.write_all(framed.as_bytes())
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
stream
.flush()
.await
.map_err(|e| AuditError::Forwarding(e.to_string()))?;
}
}
}
ForwardDestination::S3 {
bucket,
prefix,
region,
} => {
if let Some(client) = &self.s3_client {
let timestamp = event.timestamp.format("%Y-%m-%d/%H-%M-%S").to_string();
let event_id = event
.current_hash
.as_ref()
.map(|h| &h[..8])
.unwrap_or("unknown");
let key = format!(
"{}/{}-{}.json",
prefix.trim_end_matches('/'),
timestamp,
event_id
);
let json_data = serde_json::to_vec(event).map_err(AuditError::Serialization)?;
client
.put_object()
.bucket(bucket)
.key(&key)
.body(json_data.into())
.content_type("application/json")
.metadata("event_action", event.action.to_string())
.metadata("event_actor", &event.actor)
.metadata("event_outcome", event.outcome.to_string())
.send()
.await
.map_err(|e| AuditError::Forwarding(format!("S3 upload failed: {}", e)))?;
debug!(bucket, key, region, "Audit event forwarded to S3");
} else {
error!("S3 client not initialized for S3 forwarding");
return Err(AuditError::Forwarding(
"S3 client not initialized".to_string(),
));
}
}
}
Ok(())
}
pub async fn verify_chain(&self) -> AuditResult<bool> {
let file = File::open(&self.config.log_path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut prev_hash: Option<String> = None;
let mut line_num = 0;
while let Ok(Some(line)) = lines.next_line().await {
line_num += 1;
let event: AuditEvent = serde_json::from_str(&line)?;
if event.prev_hash != prev_hash {
error!(
line = line_num,
"Chain integrity violation: prev_hash mismatch"
);
return Ok(false);
}
let computed_hash = event.compute_hash(&self.config.hmac_secret)?;
prev_hash = Some(computed_hash.clone());
if let Some(stored_hash) = &event.current_hash {
if stored_hash != &computed_hash {
error!(line = line_num, "Chain integrity violation: hash mismatch");
return Ok(false);
}
}
}
info!(lines = line_num, "Audit chain verification complete");
Ok(true)
}
pub async fn query(&self, filter: AuditFilter) -> AuditResult<Vec<AuditEvent>> {
let file = File::open(&self.config.log_path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut results = Vec::new();
while let Ok(Some(line)) = lines.next_line().await {
if let Ok(event) = serde_json::from_str::<AuditEvent>(&line) {
if filter.matches(&event) {
results.push(event);
if let Some(limit) = filter.limit {
if results.len() >= limit {
break;
}
}
}
}
}
Ok(results)
}
}
async fn compress_file(
input_path: &std::path::Path,
output_path: &std::path::Path,
) -> AuditResult<()> {
let input_data = tokio::fs::read(input_path).await?;
let compressed = oxiarc_zstd::encode_all(&input_data, 3)
.map_err(|e| AuditError::Io(std::io::Error::other(e.to_string())))?;
tokio::fs::write(output_path, compressed).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile::TempDir;
#[test]
fn test_audit_event_creation() {
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::GetObject,
"bucket/key".to_string(),
AuditOutcome::Success,
);
assert_eq!(event.actor, "user123");
assert_eq!(event.action, AuditAction::GetObject);
assert_eq!(event.resource, "bucket/key");
assert_eq!(event.outcome, AuditOutcome::Success);
assert!(event.prev_hash.is_none());
assert!(event.current_hash.is_none());
}
#[test]
fn test_audit_event_builder() {
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::PutObject,
"bucket/key".to_string(),
AuditOutcome::Success,
)
.with_source_ip("192.168.1.1".to_string())
.with_status_code(200)
.with_request_id("req-123".to_string())
.with_metadata("size".to_string(), "1024".to_string());
assert_eq!(event.source_ip, Some("192.168.1.1".to_string()));
assert_eq!(event.status_code, Some(200));
assert_eq!(event.request_id, Some("req-123".to_string()));
assert_eq!(event.metadata.get("size"), Some(&"1024".to_string()));
}
#[test]
fn test_event_hash_computation() {
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::GetObject,
"bucket/key".to_string(),
AuditOutcome::Success,
);
let secret = b"test-secret";
let hash1 = event
.compute_hash(secret)
.expect("Failed to compute hash for audit event");
let hash2 = event
.compute_hash(secret)
.expect("Failed to compute hash for audit event (second attempt)");
assert_eq!(hash1, hash2);
assert_eq!(hash1.len(), 64); }
#[test]
fn test_event_hash_differs_for_different_events() {
let event1 = AuditEvent::new(
"user1".to_string(),
AuditAction::GetObject,
"bucket/key1".to_string(),
AuditOutcome::Success,
);
let event2 = AuditEvent::new(
"user2".to_string(),
AuditAction::GetObject,
"bucket/key2".to_string(),
AuditOutcome::Success,
);
let secret = b"test-secret";
let hash1 = event1
.compute_hash(secret)
.expect("Failed to compute hash for first audit event");
let hash2 = event2
.compute_hash(secret)
.expect("Failed to compute hash for second audit event");
assert_ne!(hash1, hash2);
}
#[tokio::test]
async fn test_audit_logger_creation() {
let temp_dir =
TempDir::new().expect("Failed to create temporary directory for audit logger test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path.clone())
.hmac_secret(b"test-secret".to_vec())
.build();
let _logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger");
assert!(log_path.exists());
}
#[tokio::test]
async fn test_audit_logger_log_event() {
let temp_dir =
TempDir::new().expect("Failed to create temporary directory for log event test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path.clone())
.hmac_secret(b"test-secret".to_vec())
.enable_security_detection(false)
.build();
let logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger for log event test");
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::GetObject,
"bucket/key".to_string(),
AuditOutcome::Success,
);
logger.log(event).await.expect("Failed to log audit event");
let content = tokio::fs::read_to_string(&log_path)
.await
.expect("Failed to read audit log file");
assert!(!content.is_empty());
assert!(content.contains("user123"));
assert!(content.contains("bucket/key"));
}
#[tokio::test]
async fn test_audit_logger_chain_integrity() {
let temp_dir =
TempDir::new().expect("Failed to create temporary directory for chain integrity test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path.clone())
.hmac_secret(b"test-secret".to_vec())
.enable_security_detection(false)
.build();
let logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger for chain integrity test");
for i in 0..5 {
let event = AuditEvent::new(
format!("user{}", i),
AuditAction::GetObject,
format!("bucket/key{}", i),
AuditOutcome::Success,
);
logger
.log(event)
.await
.expect("Failed to log audit event in chain integrity test");
}
assert!(logger
.verify_chain()
.await
.expect("Failed to verify audit chain"));
}
#[tokio::test]
async fn test_audit_filter() {
let temp_dir =
TempDir::new().expect("Failed to create temporary directory for audit filter test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path.clone())
.hmac_secret(b"test-secret".to_vec())
.enable_security_detection(false)
.build();
let logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger for filter test");
for i in 0..5 {
let event = AuditEvent::new(
format!("user{}", i % 2),
AuditAction::GetObject,
format!("bucket/key{}", i),
AuditOutcome::Success,
);
logger
.log(event)
.await
.expect("Failed to log audit event in filter test");
}
let filter = AuditFilter {
actor: Some("user0".to_string()),
..Default::default()
};
let results = logger
.query(filter)
.await
.expect("Failed to query audit logs");
assert_eq!(results.len(), 3);
for event in results {
assert_eq!(event.actor, "user0");
}
}
#[tokio::test]
async fn test_security_event_detector_brute_force() {
let detector = SecurityEventDetector::new();
for i in 0..6 {
let event = AuditEvent::new(
"attacker".to_string(),
AuditAction::AuthFailure,
"auth".to_string(),
AuditOutcome::Failure,
)
.with_source_ip("192.168.1.100".to_string());
let security_event = detector.detect(&event).await;
if i >= 4 {
assert!(security_event.is_some());
let se =
security_event.expect("Failed to detect security event on brute force attempt");
assert_eq!(se.event_type, "brute_force_attack");
assert_eq!(se.severity, SecuritySeverity::Critical);
}
}
}
#[test]
fn test_audit_config_builder() {
let config = AuditConfig::builder()
.log_path(PathBuf::from("/var/log/audit.log"))
.hmac_secret(b"secret123".to_vec())
.enable_security_detection(true)
.max_file_size(50 * 1024 * 1024)
.max_rotated_files(5)
.compress_rotated(true)
.build();
assert_eq!(config.log_path, PathBuf::from("/var/log/audit.log"));
assert_eq!(config.hmac_secret, b"secret123".to_vec());
assert!(config.enable_security_detection);
assert_eq!(config.max_file_size, 50 * 1024 * 1024);
assert_eq!(config.max_rotated_files, 5);
assert!(config.compress_rotated);
}
#[tokio::test]
async fn test_syslog_rfc5424_format() {
let temp_dir =
TempDir::new().expect("Failed to create temporary directory for syslog format test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path)
.hmac_secret(b"test-secret".to_vec())
.build();
let logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger for syslog format test");
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::PutObject,
"bucket/key.txt".to_string(),
AuditOutcome::Success,
)
.with_source_ip("192.168.1.100".to_string())
.with_status_code(200);
let syslog_msg = logger
.format_syslog_rfc5424(&event)
.expect("Failed to format audit event as RFC5424 syslog message");
assert!(syslog_msg.starts_with("<134>1")); assert!(syslog_msg.contains("rs3gw")); assert!(syslog_msg.contains("PutObject")); assert!(syslog_msg.contains("[audit@rs3gw")); assert!(syslog_msg.contains("user123")); assert!(syslog_msg.contains("bucket/key.txt")); assert!(syslog_msg.contains("192.168.1.100")); }
#[tokio::test]
async fn test_syslog_failure_severity() {
let temp_dir = TempDir::new()
.expect("Failed to create temporary directory for syslog failure severity test");
let log_path = temp_dir.path().join("audit.log");
let config = AuditConfig::builder()
.log_path(log_path)
.hmac_secret(b"test-secret".to_vec())
.build();
let logger = AuditLogger::new(config)
.await
.expect("Failed to create audit logger for syslog failure severity test");
let event = AuditEvent::new(
"user123".to_string(),
AuditAction::GetObject,
"bucket/key.txt".to_string(),
AuditOutcome::Failure,
);
let syslog_msg = logger
.format_syslog_rfc5424(&event)
.expect("Failed to format failure event as RFC5424 syslog message");
assert!(syslog_msg.starts_with("<132>1"));
}
}