use acton_reactive::prelude::*;
use std::path::PathBuf;
use std::sync::Arc;
use super::alert::AuditAlertHook;
use super::alert_webhook::WebhookAlertHook;
use super::chain::AuditChain;
use super::config::AuditConfig;
use super::event::AuditEvent;
use super::failure_tracker::FailureTracker;
use super::storage::AuditStorage;
use super::syslog::SyslogSender;
#[derive(Default)]
pub struct AuditAgentState {
pub chain: Option<AuditChain>,
pub storage: Option<Arc<dyn AuditStorage>>,
pub syslog: Option<SyslogSender>,
pub config: Option<AuditConfig>,
pub(crate) failure_tracker: Option<Arc<FailureTracker>>,
}
impl std::fmt::Debug for AuditAgentState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditAgentState")
.field("chain", &self.chain.is_some())
.field("storage", &self.storage.is_some())
.field("syslog", &self.syslog.is_some())
.field("config", &self.config.is_some())
.field("failure_tracker", &self.failure_tracker.is_some())
.finish()
}
}
#[derive(Clone, Debug)]
struct ChainLoaded {
previous_hash: Option<String>,
sequence: u64,
service_name: String,
}
#[derive(Clone, Debug)]
struct CleanupTrigger;
const CLEANUP_BATCH_SIZE: usize = 10_000;
pub struct AuditAgent;
impl AuditAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: AuditConfig,
storage: Option<Arc<dyn AuditStorage>>,
service_name: String,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<AuditAgentState>();
let syslog = if config.syslog.transport != "none" {
match SyslogSender::new(&config.syslog) {
Ok(sender) => Some(sender),
Err(e) => {
tracing::warn!("Failed to initialize syslog sender: {}", e);
None
}
}
} else {
None
};
let retention_days = config.retention_days;
let cleanup_interval_hours = config.cleanup_interval_hours;
let failure_tracker = if let Some(ref alert_config) = config.alerts {
if alert_config.enabled {
let mut hooks: Vec<Arc<dyn AuditAlertHook>> = Vec::new();
for wh in &alert_config.webhooks {
hooks.push(Arc::new(WebhookAlertHook::new(
wh.url.clone(),
std::time::Duration::from_secs(wh.timeout_secs),
wh.headers.clone(),
)));
}
Some(Arc::new(FailureTracker::new(
hooks,
alert_config.threshold_secs,
alert_config.cooldown_secs,
alert_config.notify_recovery,
service_name.clone(),
)))
} else {
None
}
} else {
None
};
agent.model.config = Some(config);
agent.model.storage = storage;
agent.model.syslog = syslog;
agent.model.failure_tracker = failure_tracker;
let storage_for_start = agent.model.storage.clone();
let service_name_for_start = service_name.clone();
agent.mutate_on::<ChainLoaded>(|agent, envelope| {
let msg = envelope.message().clone();
let chain = if let Some(ref hash) = msg.previous_hash {
AuditChain::resume(msg.service_name, hash.clone(), msg.sequence)
} else {
AuditChain::new(msg.service_name)
};
agent.model.chain = Some(chain);
tracing::info!("Audit chain initialized at sequence {}", msg.sequence);
Reply::ready()
});
agent.mutate_on::<AuditEvent>(|agent, envelope| {
let event = envelope.message().clone();
let sealed_event = if let Some(ref mut chain) = agent.model.chain {
chain.seal(event)
} else {
tracing::warn!("Audit chain not initialized, dropping event");
return Reply::ready();
};
let storage = agent.model.storage.clone();
let syslog = agent.model.syslog.clone();
let tracker = agent.model.failure_tracker.clone();
tokio::spawn(async move {
if let Some(ref store) = storage {
match store.append(&sealed_event).await {
Ok(()) => {
if let Some(ref t) = tracker {
t.record_success();
}
}
Err(e) => {
tracing::error!("Failed to persist audit event: {}", e);
if let Some(ref t) = tracker {
t.record_failure(&e.to_string());
}
}
}
}
if let Some(ref sender) = syslog {
if let Err(e) = sender.send(&sealed_event).await {
tracing::warn!("Failed to send audit event to syslog: {}", e);
}
}
#[cfg(feature = "observability")]
{
super::otlp::emit_audit_log(&sealed_event);
}
});
Reply::ready()
});
agent.mutate_on::<CleanupTrigger>(|agent, _envelope| {
let config = agent.model.config.clone();
let storage = agent.model.storage.clone();
tokio::spawn(async move {
if let (Some(config), Some(storage)) = (config, storage) {
if let Err(e) = run_cleanup(&config, storage.as_ref()).await {
tracing::error!("Audit retention cleanup failed: {}", e);
}
}
});
Reply::ready()
});
agent.after_start(move |agent| {
let storage = storage_for_start.clone();
let service_name = service_name_for_start.clone();
let self_handle = agent.handle().clone();
tokio::spawn(async move {
let (previous_hash, sequence) = if let Some(ref store) = storage {
match store.latest().await {
Ok(Some(event)) => {
tracing::info!(
"Resuming audit chain at sequence {} for {}",
event.sequence,
service_name
);
(event.hash, event.sequence)
}
Ok(None) => {
tracing::info!("Starting new audit chain for {}", service_name);
(None, 0)
}
Err(e) => {
tracing::error!(
"Failed to load audit chain state: {}. Starting fresh.",
e
);
(None, 0)
}
}
} else {
tracing::info!(
"No audit storage configured, starting in-memory chain for {}",
service_name
);
(None, 0)
};
self_handle
.send(ChainLoaded {
previous_hash,
sequence,
service_name,
})
.await;
});
Reply::ready()
});
let handle = agent.start().await;
if retention_days.is_some() {
let cleanup_handle = handle.clone();
let interval_hours = cleanup_interval_hours;
tokio::spawn(async move {
let period = std::time::Duration::from_secs(interval_hours as u64 * 3600);
let mut interval = tokio::time::interval(period);
interval.tick().await;
loop {
interval.tick().await;
tracing::debug!("Triggering audit retention cleanup");
cleanup_handle.send(CleanupTrigger).await;
}
});
}
Ok(handle)
}
}
async fn run_cleanup(
config: &AuditConfig,
storage: &dyn AuditStorage,
) -> Result<(), crate::error::Error> {
let retention_days = match config.retention_days {
Some(days) => days,
None => return Ok(()),
};
let cutoff = chrono::Utc::now() - chrono::Duration::days(retention_days as i64);
let archive_dir = config.archive_path.as_ref().map(PathBuf::from);
let mut total_purged: u64 = 0;
loop {
let events = storage.query_before(cutoff, CLEANUP_BATCH_SIZE).await?;
if events.is_empty() {
break;
}
let batch_count = events.len();
if let Some(ref dir) = archive_dir {
super::archive::archive_events(&events, dir).await?;
}
let purged = storage.purge_before(cutoff).await?;
total_purged += purged;
tracing::info!(
"Audit cleanup: purged {} events (batch had {})",
purged,
batch_count
);
if batch_count < CLEANUP_BATCH_SIZE {
break;
}
}
if total_purged > 0 {
tracing::info!(
"Audit retention cleanup complete: purged {} total events older than {} days",
total_purged,
retention_days
);
}
Ok(())
}