use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use super::client::OrgApi;
const QUEUE_CAP: usize = 1000;
const BATCH_SIZE: usize = 200;
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditEvent {
pub id: String,
pub ts: DateTime<Utc>,
pub rule_id: String,
pub decision: String,
pub severity: String,
pub tool: String,
pub fingerprint: String,
#[serde(default)]
pub context: serde_json::Value,
}
pub struct AuditSink {
queue: Mutex<Vec<AuditEvent>>,
api: Arc<OrgApi>,
}
impl AuditSink {
pub fn new(api: Arc<OrgApi>) -> Arc<Self> {
let sink = Arc::new(Self {
queue: Mutex::new(Vec::with_capacity(QUEUE_CAP)),
api,
});
let sink_for_task = sink.clone();
tokio::spawn(async move {
sink_for_task.run_flusher().await;
});
sink
}
pub async fn record(&self, ev: AuditEvent) {
let mut q = self.queue.lock().await;
if q.len() >= QUEUE_CAP {
let dropped = q.remove(0);
log::warn!(
"[shield] audit queue full ({} events buffered); dropped oldest rule_id={} ts={}",
QUEUE_CAP, dropped.rule_id, dropped.ts
);
}
q.push(ev);
}
async fn run_flusher(self: Arc<Self>) {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(60);
loop {
tokio::time::sleep(FLUSH_INTERVAL).await;
let batch = {
let mut q = self.queue.lock().await;
if q.is_empty() {
continue;
}
let n = q.len().min(BATCH_SIZE);
q.drain(..n).collect::<Vec<_>>()
};
let payload: Vec<serde_json::Value> = batch
.iter()
.filter_map(|e| serde_json::to_value(e).ok())
.collect();
match self.api.post_events(&payload).await {
Ok(ack) => {
log::debug!(
"[shield] audit shipped: received={} batch_size={}",
ack.received,
batch.len()
);
backoff = Duration::from_secs(1);
}
Err(e) => {
let mut q = self.queue.lock().await;
for ev in batch.into_iter().rev() {
if q.len() >= QUEUE_CAP {
let _ = q.pop();
}
q.insert(0, ev);
}
drop(q);
log::warn!(
"[shield] audit ship failed: {} -- retry in {:?}",
e,
backoff
);
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
}
}
}
pub async fn drain(&self) {
let q = self.queue.lock().await;
let n = q.len();
drop(q);
if n == 0 {
return;
}
log::info!(
"[shield] draining {} pending audit events before exit",
n
);
tokio::time::sleep(FLUSH_INTERVAL + Duration::from_millis(500)).await;
}
}