pub mod event;
pub mod exporter;
pub mod file;
pub mod logstash;
pub mod otel;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub use event::{AuditEvent, EventResult, EventType};
pub use exporter::{AuditExporter, NullExporter};
pub use file::{FileExporter, RotateConfig};
pub use logstash::LogstashExporter;
pub use otel::OtelExporter;
#[derive(Debug, Clone)]
pub struct AuditConfig {
pub enabled: bool,
pub buffer_size: usize,
pub batch_size: usize,
pub flush_interval_secs: u64,
pub exporters: Vec<AuditExporterConfig>,
}
#[derive(Debug, Clone)]
pub enum AuditExporterConfig {
Null,
File {
path: String,
},
Otel {
endpoint: String,
},
Logstash {
host: String,
port: u16,
},
}
impl Default for AuditConfig {
fn default() -> Self {
Self {
enabled: false,
buffer_size: 1000,
batch_size: 100,
flush_interval_secs: 5,
exporters: vec![AuditExporterConfig::Null],
}
}
}
impl AuditConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
assert!(size >= 1, "buffer_size must be at least 1");
self.buffer_size = size;
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
assert!(size >= 1, "batch_size must be at least 1");
self.batch_size = size;
self
}
pub fn with_flush_interval(mut self, secs: u64) -> Self {
assert!(secs >= 1, "flush_interval_secs must be at least 1");
self.flush_interval_secs = secs;
self
}
pub fn with_exporters(mut self, exporters: Vec<AuditExporterConfig>) -> Self {
self.exporters = exporters;
self
}
}
pub struct AuditManager {
exporters: Vec<Arc<dyn AuditExporter>>,
sender: mpsc::Sender<AuditEvent>,
enabled: bool,
worker_handle: Option<JoinHandle<()>>,
}
impl AuditManager {
pub fn new(config: &AuditConfig) -> Result<Self> {
let (sender, receiver) = mpsc::channel(config.buffer_size);
let mut exporters: Vec<Arc<dyn AuditExporter>> = Vec::new();
for exporter_config in &config.exporters {
let exporter: Arc<dyn AuditExporter> = match exporter_config {
AuditExporterConfig::Null => Arc::new(NullExporter::new()),
AuditExporterConfig::File { path } => {
let file_exporter = FileExporter::new(std::path::Path::new(path))?;
Arc::new(file_exporter)
}
AuditExporterConfig::Otel { endpoint } => {
let otel_exporter = OtelExporter::new(endpoint)?;
Arc::new(otel_exporter)
}
AuditExporterConfig::Logstash { host, port } => {
let logstash_exporter = LogstashExporter::new(host, *port)?;
Arc::new(logstash_exporter)
}
};
exporters.push(exporter);
}
let worker_handle = if config.enabled {
let batch_size = config.batch_size;
let flush_interval = Duration::from_secs(config.flush_interval_secs);
Some(tokio::spawn(Self::worker(
receiver,
exporters.clone(),
batch_size,
flush_interval,
)))
} else {
None
};
let manager = Self {
exporters,
sender,
enabled: config.enabled,
worker_handle,
};
Ok(manager)
}
pub async fn log(&self, event: AuditEvent) {
if !self.enabled {
return;
}
if let Err(e) = self.sender.send(event).await {
tracing::warn!("Failed to send audit event: {}", e);
}
}
async fn worker(
mut receiver: mpsc::Receiver<AuditEvent>,
exporters: Vec<Arc<dyn AuditExporter>>,
batch_size: usize,
flush_interval: Duration,
) {
let mut buffer = Vec::with_capacity(batch_size);
let mut flush_timer = tokio::time::interval(flush_interval);
flush_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
event_opt = receiver.recv() => {
match event_opt {
Some(event) => {
buffer.push(event);
if buffer.len() >= batch_size {
Self::flush_buffer(&exporters, &mut buffer).await;
}
}
None => {
if !buffer.is_empty() {
Self::flush_buffer(&exporters, &mut buffer).await;
}
break;
}
}
}
_ = flush_timer.tick() => {
if !buffer.is_empty() {
Self::flush_buffer(&exporters, &mut buffer).await;
}
}
}
}
for exporter in &exporters {
if let Err(e) = exporter.close().await {
tracing::error!("Failed to close exporter: {}", e);
}
}
}
async fn flush_buffer(exporters: &[Arc<dyn AuditExporter>], buffer: &mut Vec<AuditEvent>) {
for exporter in exporters {
if let Err(e) = exporter.export_batch(buffer).await {
tracing::error!("Audit export failed: {}", e);
}
}
buffer.clear();
}
pub async fn flush(&self) {
for exporter in &self.exporters {
if let Err(e) = exporter.flush().await {
tracing::error!("Audit flush failed: {}", e);
}
}
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub async fn shutdown(mut self) -> Result<()> {
drop(self.sender);
if let Some(handle) = self.worker_handle.take() {
handle
.await
.map_err(|e| anyhow::anyhow!("Worker task panicked: {}", e))?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_audit_manager_creation() {
let config = AuditConfig::default();
let manager = AuditManager::new(&config);
assert!(manager.is_ok());
}
#[tokio::test]
async fn test_audit_manager_disabled() {
let config = AuditConfig::new().with_enabled(false);
let manager = AuditManager::new(&config).unwrap();
let event = AuditEvent::new(
EventType::AuthSuccess,
"test".to_string(),
"session-1".to_string(),
);
manager.log(event).await;
assert!(!manager.is_enabled());
}
#[tokio::test]
async fn test_audit_manager_enabled() {
let config = AuditConfig::new()
.with_enabled(true)
.with_buffer_size(10)
.with_batch_size(5);
let manager = AuditManager::new(&config).unwrap();
assert!(manager.is_enabled());
let event = AuditEvent::new(
EventType::SessionStart,
"alice".to_string(),
"session-123".to_string(),
);
manager.log(event).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_audit_manager_batch() {
let config = AuditConfig::new()
.with_enabled(true)
.with_batch_size(3)
.with_buffer_size(100);
let manager = AuditManager::new(&config).unwrap();
for i in 0..5 {
let event = AuditEvent::new(
EventType::FileUploaded,
format!("user{}", i),
format!("session-{}", i),
);
manager.log(event).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
manager.flush().await;
}
#[tokio::test]
async fn test_audit_config_builder() {
let config = AuditConfig::new()
.with_enabled(true)
.with_buffer_size(500)
.with_batch_size(50)
.with_flush_interval(10);
assert!(config.enabled);
assert_eq!(config.buffer_size, 500);
assert_eq!(config.batch_size, 50);
assert_eq!(config.flush_interval_secs, 10);
}
#[tokio::test]
async fn test_audit_manager_with_null_exporter() {
let config = AuditConfig::new()
.with_enabled(true)
.with_exporters(vec![AuditExporterConfig::Null]);
let manager = AuditManager::new(&config).unwrap();
let event = AuditEvent::new(
EventType::CommandExecuted,
"bob".to_string(),
"session-456".to_string(),
);
manager.log(event).await;
tokio::time::sleep(Duration::from_millis(50)).await;
manager.flush().await;
}
#[tokio::test]
async fn test_audit_manager_flush_on_interval() {
let config = AuditConfig::new()
.with_enabled(true)
.with_batch_size(100) .with_flush_interval(1);
let manager = AuditManager::new(&config).unwrap();
for i in 0..3 {
let event = AuditEvent::new(
EventType::DirectoryListed,
format!("user{}", i),
format!("session-{}", i),
);
manager.log(event).await;
}
tokio::time::sleep(Duration::from_millis(1100)).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_audit_manager_shutdown() {
let config = AuditConfig::new()
.with_enabled(true)
.with_buffer_size(10)
.with_batch_size(5)
.with_flush_interval(1);
let manager = AuditManager::new(&config).unwrap();
for i in 0..3 {
let event = AuditEvent::new(
EventType::FileUploaded,
format!("user{}", i),
format!("session-{}", i),
);
manager.log(event).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let result = tokio::time::timeout(Duration::from_secs(10), manager.shutdown()).await;
assert!(result.is_ok(), "Shutdown timed out");
assert!(result.unwrap().is_ok(), "Shutdown failed");
}
#[test]
#[should_panic(expected = "buffer_size must be at least 1")]
fn test_audit_config_invalid_buffer_size() {
let _config = AuditConfig::new().with_buffer_size(0);
}
#[test]
#[should_panic(expected = "batch_size must be at least 1")]
fn test_audit_config_invalid_batch_size() {
let _config = AuditConfig::new().with_batch_size(0);
}
#[test]
#[should_panic(expected = "flush_interval_secs must be at least 1")]
fn test_audit_config_invalid_flush_interval() {
let _config = AuditConfig::new().with_flush_interval(0);
}
#[test]
fn test_audit_config_valid_minimum_values() {
let config = AuditConfig::new()
.with_buffer_size(1)
.with_batch_size(1)
.with_flush_interval(1);
assert_eq!(config.buffer_size, 1);
assert_eq!(config.batch_size, 1);
assert_eq!(config.flush_interval_secs, 1);
}
}