use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use futures::future::BoxFuture;
use sha2::{Digest, Sha256};
use tokio::sync::mpsc;
use crate::web::context::RequestContext;
#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditOutcome {
Success,
Denied,
Error,
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct AuditRecord {
pub timestamp_ms: u64,
pub action: &'static str,
pub resource: &'static str,
pub actor_sub: Option<String>,
pub actor_role: Option<String>,
pub tenant: Option<String>,
pub trace_id: String,
pub method: String,
pub path: String,
pub outcome: AuditOutcome,
pub status: u16,
pub prev_hash: String,
}
pub trait AuditSink: Send + Sync + 'static {
fn write_batch<'a>(&'a self, records: &'a [AuditRecord]) -> BoxFuture<'a, Result<(), String>>;
}
pub struct AuditPipeline {
tx: mpsc::Sender<AuditRecord>,
}
impl AuditPipeline {
pub fn new(sink: Arc<dyn AuditSink>, capacity: usize, flush_batch: usize) -> Self {
let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity);
tokio::spawn(async move {
let mut buf: Vec<AuditRecord> = Vec::with_capacity(flush_batch);
let mut prev_hash = String::new();
while rx.recv_many(&mut buf, flush_batch).await > 0 {
for rec in buf.iter_mut() {
rec.prev_hash = std::mem::take(&mut prev_hash);
let serialized = serde_json::to_vec(&rec).unwrap_or_default();
prev_hash = hex(&Sha256::digest(&serialized));
}
if let Err(e) = sink.write_batch(&buf).await {
metrics::counter!("audit_sink_errors_total").increment(1);
tracing::error!(error = %e, lost = buf.len(), "audit sink write failed");
}
buf.clear();
}
});
Self { tx }
}
#[inline]
pub fn record(&self, rec: AuditRecord) {
if self.tx.try_send(rec).is_err() {
metrics::counter!("audit_dropped_total").increment(1);
}
}
}
fn hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
#[doc(hidden)]
pub fn emit_route_audit(
ctx: &RequestContext,
action: &'static str,
resource: &'static str,
status: u16,
) {
let Some(pipeline) = ctx.try_inject::<AuditPipeline>() else {
return;
};
let outcome = match status {
200..=399 => AuditOutcome::Success,
401 | 403 => AuditOutcome::Denied,
_ => AuditOutcome::Error,
};
let claims = ctx.claims();
pipeline.record(AuditRecord {
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
action,
resource,
actor_sub: claims
.and_then(|c| c.get("sub"))
.and_then(|v| v.as_str())
.map(str::to_owned),
actor_role: claims
.and_then(|c| c.get("role"))
.and_then(|v| v.as_str())
.map(str::to_owned),
tenant: ctx.tenant().map(|t| t.id.as_str().to_owned()),
trace_id: ctx.trace_id_hex(),
method: ctx.method().to_string(),
path: ctx.path().to_owned(),
outcome,
status,
prev_hash: String::new(), });
}