use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::error::AppResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditEvent {
pub audit_id: i64,
pub occurred_at: DateTime<Utc>,
pub credential: String,
pub operation: String,
pub outcome: String,
pub worker_id: Option<String>,
pub execution_id: Option<i64>,
pub parent_execution_id: Option<i64>,
pub server_region: Option<String>,
pub broker_region: Option<String>,
pub kek_version: Option<String>,
pub notes: Option<String>,
}
#[derive(Debug, Clone, Copy)]
pub enum Operation {
GetSealed,
CrossRegionBrokerServe,
ResolveKeychain,
GetCredential,
}
impl Operation {
pub fn as_str(self) -> &'static str {
match self {
Operation::GetSealed => "get_sealed",
Operation::CrossRegionBrokerServe => "cross_region_broker_serve",
Operation::ResolveKeychain => "resolve_keychain",
Operation::GetCredential => "get_credential",
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Outcome {
Ok,
ResidencyViolation,
BrokerUnreachable,
CredentialNotFound,
NoPubkey,
WrongRegion,
ProviderFetchError,
TemplateError,
Other(&'static str),
}
impl Outcome {
pub fn as_str(self) -> &'static str {
match self {
Outcome::Ok => "ok",
Outcome::ResidencyViolation => "residency_violation",
Outcome::BrokerUnreachable => "broker_unreachable",
Outcome::CredentialNotFound => "credential_not_found",
Outcome::NoPubkey => "no_pubkey",
Outcome::WrongRegion => "wrong_region",
Outcome::ProviderFetchError => "provider_fetch_error",
Outcome::TemplateError => "template_error",
Outcome::Other(s) => s,
}
}
}
#[async_trait]
pub trait AuditSink: Send + Sync {
async fn write(&self, event: &AuditEvent) -> AppResult<()>;
fn sink_id(&self) -> &str;
}
#[derive(Debug, Default)]
pub struct NoopAuditSink;
#[async_trait]
impl AuditSink for NoopAuditSink {
async fn write(&self, _event: &AuditEvent) -> AppResult<()> {
Ok(())
}
fn sink_id(&self) -> &str {
"noop"
}
}
#[derive(Debug, Clone)]
pub struct DbAuditSink {
pool: crate::db::DbPool,
}
impl DbAuditSink {
pub fn new(pool: crate::db::DbPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl AuditSink for DbAuditSink {
async fn write(&self, event: &AuditEvent) -> AppResult<()> {
crate::db::queries::secret_audit::insert(&self.pool, event).await
}
fn sink_id(&self) -> &str {
"db"
}
}
#[derive(Clone)]
pub struct SecretAuditService {
sink: Arc<dyn AuditSink>,
strict: bool,
}
impl SecretAuditService {
pub fn new(sink: Arc<dyn AuditSink>, strict: bool) -> Self {
Self { sink, strict }
}
pub fn from_env(sink: Arc<dyn AuditSink>) -> Self {
let strict = matches!(
std::env::var("NOETL_SECRET_AUDIT_REQUIRED").ok().as_deref(),
Some("1") | Some("true") | Some("TRUE") | Some("yes") | Some("YES")
);
Self { sink, strict }
}
pub fn noop() -> Self {
Self {
sink: Arc::new(NoopAuditSink),
strict: false,
}
}
pub fn record_async(&self, event: AuditEvent) {
let sink = self.sink.clone();
let operation = event.operation.clone();
let outcome = event.outcome.clone();
tokio::spawn(async move {
match sink.write(&event).await {
Ok(()) => {
crate::metrics::record_secret_audit_write(&operation, &outcome, "written")
}
Err(e) => {
crate::metrics::record_secret_audit_write(
&operation,
&outcome,
"dropped_async",
);
tracing::warn!(
sink = sink.sink_id(),
operation = %operation,
outcome = %outcome,
error = %e,
"secret_audit.write failed (dropped)"
);
}
}
});
}
pub async fn record_strict(&self, event: AuditEvent) -> AppResult<()> {
let operation = event.operation.clone();
let outcome = event.outcome.clone();
match self.sink.write(&event).await {
Ok(()) => {
crate::metrics::record_secret_audit_write(&operation, &outcome, "written");
Ok(())
}
Err(e) => {
crate::metrics::record_secret_audit_write(&operation, &outcome, "failed_strict");
tracing::error!(
sink = self.sink.sink_id(),
operation = %operation,
outcome = %outcome,
error = %e,
"secret_audit.write failed under strict mode (resolution blocked)"
);
Err(e)
}
}
}
pub async fn record(&self, event: AuditEvent) -> AppResult<()> {
if self.strict {
self.record_strict(event).await
} else {
self.record_async(event);
Ok(())
}
}
pub fn is_strict(&self) -> bool {
self.strict
}
}
pub fn build_event(
audit_id: i64,
credential: impl Into<String>,
operation: Operation,
outcome: Outcome,
) -> AuditEvent {
AuditEvent {
audit_id,
occurred_at: Utc::now(),
credential: credential.into(),
operation: operation.as_str().to_string(),
outcome: outcome.as_str().to_string(),
worker_id: None,
execution_id: None,
parent_execution_id: None,
server_region: None,
broker_region: None,
kek_version: None,
notes: None,
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
#[derive(Default)]
struct MockSink {
seen: Mutex<Vec<AuditEvent>>,
fail: bool,
}
#[async_trait]
impl AuditSink for MockSink {
async fn write(&self, event: &AuditEvent) -> AppResult<()> {
if self.fail {
return Err(crate::error::AppError::Internal(
"mock sink: failing write".to_string(),
));
}
self.seen.lock().unwrap().push(event.clone());
Ok(())
}
fn sink_id(&self) -> &str {
"mock"
}
}
fn make_event(audit_id: i64) -> AuditEvent {
build_event(audit_id, "duffel_token", Operation::GetSealed, Outcome::Ok)
}
#[test]
fn builder_fills_audit_id_and_timestamp() {
let e = make_event(42);
assert_eq!(e.audit_id, 42);
assert_eq!(e.credential, "duffel_token");
assert_eq!(e.operation, "get_sealed");
assert_eq!(e.outcome, "ok");
assert!(e.worker_id.is_none());
assert!(e.execution_id.is_none());
}
#[test]
fn operation_and_outcome_as_str_round_trip() {
assert_eq!(Operation::GetSealed.as_str(), "get_sealed");
assert_eq!(
Operation::CrossRegionBrokerServe.as_str(),
"cross_region_broker_serve"
);
assert_eq!(Outcome::ResidencyViolation.as_str(), "residency_violation");
assert_eq!(Outcome::Other("custom_thing").as_str(), "custom_thing");
}
#[tokio::test]
async fn noop_sink_always_succeeds() {
let s = NoopAuditSink;
s.write(&make_event(1)).await.unwrap();
assert_eq!(s.sink_id(), "noop");
}
#[tokio::test]
async fn record_strict_blocks_on_sink_failure() {
let sink = Arc::new(MockSink {
seen: Mutex::new(Vec::new()),
fail: true,
});
let svc = SecretAuditService::new(sink, true);
assert!(svc.is_strict());
let err = svc.record_strict(make_event(1)).await.unwrap_err();
assert!(format!("{err:?}").contains("failing write"));
}
#[tokio::test]
async fn record_strict_persists_on_success() {
let sink = Arc::new(MockSink {
seen: Mutex::new(Vec::new()),
fail: false,
});
let svc = SecretAuditService::new(sink.clone(), true);
svc.record_strict(make_event(7)).await.unwrap();
let seen = sink.seen.lock().unwrap();
assert_eq!(seen.len(), 1);
assert_eq!(seen[0].audit_id, 7);
}
#[tokio::test]
async fn record_dispatches_async_when_not_strict() {
let sink = Arc::new(MockSink {
seen: Mutex::new(Vec::new()),
fail: false,
});
let svc = SecretAuditService::new(sink.clone(), false);
assert!(!svc.is_strict());
svc.record(make_event(1)).await.unwrap();
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let seen = sink.seen.lock().unwrap();
assert!(seen.len() <= 1, "spawned write at most once");
}
#[tokio::test]
async fn noop_service_records_without_blocking() {
let svc = SecretAuditService::noop();
svc.record(make_event(1)).await.unwrap();
svc.record_strict(make_event(2)).await.unwrap();
}
#[test]
fn from_env_respects_truthy_values() {
let saved = std::env::var("NOETL_SECRET_AUDIT_REQUIRED").ok();
let sink: Arc<dyn AuditSink> = Arc::new(NoopAuditSink);
for val in ["1", "true", "TRUE", "yes", "YES"] {
unsafe { std::env::set_var("NOETL_SECRET_AUDIT_REQUIRED", val) };
assert!(
SecretAuditService::from_env(sink.clone()).is_strict(),
"value {val:?} should enable strict mode"
);
}
unsafe { std::env::set_var("NOETL_SECRET_AUDIT_REQUIRED", "0") };
assert!(!SecretAuditService::from_env(sink.clone()).is_strict());
unsafe { std::env::remove_var("NOETL_SECRET_AUDIT_REQUIRED") };
assert!(!SecretAuditService::from_env(sink).is_strict());
match saved {
Some(v) => unsafe { std::env::set_var("NOETL_SECRET_AUDIT_REQUIRED", v) },
None => unsafe { std::env::remove_var("NOETL_SECRET_AUDIT_REQUIRED") },
}
}
}