use std::future::Future;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use crate::state::AppState;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AuditStatus {
Success,
Failure,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuditEvent {
pub timestamp: DateTime<Utc>,
pub actor_id: String,
pub action: String,
pub target_resource_id: String,
pub ip_address: Option<IpAddr>,
pub status: AuditStatus,
}
impl AuditEvent {
#[must_use]
pub fn new(
actor_id: impl Into<String>,
action: impl Into<String>,
target_resource_id: impl Into<String>,
ip_address: Option<IpAddr>,
status: AuditStatus,
) -> Self {
Self {
timestamp: Utc::now(),
actor_id: actor_id.into(),
action: action.into(),
target_resource_id: target_resource_id.into(),
ip_address,
status,
}
}
}
#[derive(Debug, Error)]
#[error("audit sink write failed: {message}")]
pub struct AuditError {
message: String,
}
impl AuditError {
#[must_use]
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
fn message(&self) -> &str {
&self.message
}
}
type AuditWriteFuture<'a> = Pin<Box<dyn Future<Output = Result<(), AuditError>> + Send + 'a>>;
pub trait AuditSink: Send + Sync + 'static {
fn write(&self, event: AuditEvent) -> AuditWriteFuture<'_>;
}
#[derive(Clone, Default)]
pub struct AuditLogger {
sinks: Vec<Arc<dyn AuditSink>>,
}
impl AuditLogger {
#[must_use]
pub const fn new() -> Self {
Self { sinks: Vec::new() }
}
#[must_use]
pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
self.sinks.push(sink);
self
}
pub async fn write(&self, event: AuditEvent) -> Result<(), AuditError> {
let mut errors = Vec::new();
for sink in &self.sinks {
if let Err(error) = sink.write(event.clone()).await {
errors.push(error);
}
}
if errors.is_empty() {
Ok(())
} else {
let details = errors
.iter()
.map(|error| error.message().to_owned())
.collect::<Vec<_>>()
.join(" | ");
Err(AuditError::new(format!(
"{} audit sink(s) failed: {details}",
errors.len()
)))
}
}
#[must_use]
pub fn is_enabled(&self) -> bool {
!self.sinks.is_empty()
}
}
#[derive(Debug, Default)]
pub struct TracingAuditSink;
impl AuditSink for TracingAuditSink {
fn write(&self, event: AuditEvent) -> AuditWriteFuture<'_> {
Box::pin(async move {
tracing::info!(
target: "autumn.audit",
timestamp = %event.timestamp,
actor_id = %event.actor_id,
action = %event.action,
target_resource_id = %event.target_resource_id,
ip_address = ?event.ip_address,
status = ?event.status,
"audit_event"
);
Ok(())
})
}
}
#[derive(Debug)]
pub struct JsonlFileAuditSink {
path: PathBuf,
write_lock: Mutex<()>,
}
impl JsonlFileAuditSink {
#[must_use]
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
write_lock: Mutex::new(()),
}
}
}
impl AuditSink for JsonlFileAuditSink {
fn write(&self, event: AuditEvent) -> AuditWriteFuture<'_> {
Box::pin(async move {
let mut encoded = serde_json::to_vec(&event).map_err(|error| {
AuditError::new(format!("failed to encode audit event: {error}"))
})?;
encoded.push(b'\n');
let _guard = self.write_lock.lock().await;
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.await
.map_err(|error| {
AuditError::new(format!("failed to open audit log file: {error}"))
})?;
file.write_all(&encoded).await.map_err(|error| {
AuditError::new(format!("failed to write audit event: {error}"))
})?;
file.sync_data()
.await
.map_err(|error| AuditError::new(format!("failed to sync audit file: {error}")))?;
Ok(())
})
}
}
pub async fn write_from_state(state: &AppState, event: AuditEvent) -> Result<(), AuditError> {
if let Some(logger) = state.extension::<AuditLogger>() {
logger.write(event).await
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
struct FailingSink;
impl AuditSink for FailingSink {
fn write(&self, _event: AuditEvent) -> AuditWriteFuture<'_> {
Box::pin(async { Err(AuditError::new("boom")) })
}
}
struct CountingSink {
writes: Arc<AtomicUsize>,
}
impl AuditSink for CountingSink {
fn write(&self, _event: AuditEvent) -> AuditWriteFuture<'_> {
let writes = self.writes.clone();
Box::pin(async move {
writes.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
}
#[tokio::test]
async fn jsonl_sink_appends_events() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("audit.log");
let sink = JsonlFileAuditSink::new(&path);
sink.write(AuditEvent::new(
"admin-1",
"user.role.update",
"user-99",
None,
AuditStatus::Success,
))
.await
.expect("write first event");
sink.write(AuditEvent::new(
"api-key-1",
"export.create",
"export-42",
None,
AuditStatus::Failure,
))
.await
.expect("write second event");
let content = tokio::fs::read_to_string(&path)
.await
.expect("read audit file");
let line_count = content.lines().filter(|l| !l.is_empty()).count();
assert_eq!(line_count, 2, "content:\n{content}");
}
#[tokio::test]
async fn write_from_state_no_logger_is_noop() {
let state = AppState::for_test();
write_from_state(
&state,
AuditEvent::new("u1", "auth.login", "session-1", None, AuditStatus::Success),
)
.await
.expect("no-op write should succeed");
}
#[tokio::test]
async fn audit_logger_continues_fan_out_after_sink_failure() {
let writes = Arc::new(AtomicUsize::new(0));
let logger = AuditLogger::new()
.with_sink(Arc::new(FailingSink))
.with_sink(Arc::new(CountingSink {
writes: writes.clone(),
}));
let error = logger
.write(AuditEvent::new(
"u1",
"auth.login",
"session-1",
None,
AuditStatus::Failure,
))
.await
.expect_err("first sink should fail");
assert!(
error.to_string().contains("1 audit sink(s) failed"),
"unexpected error: {error}"
);
assert_eq!(
writes.load(Ordering::SeqCst),
1,
"second sink should still receive event"
);
}
}