use std::path::Path;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use aa_core::storage::{AuditEntry, AuditSink, Result, StorageError};
use rusqlite::{params, Connection, OptionalExtension};
fn backend_err(err: rusqlite::Error) -> StorageError {
StorageError::Backend(err.to_string())
}
fn now_unix_nanos() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as i64)
.unwrap_or(0)
}
fn prune_to_cap(conn: &Connection, cap: usize) -> Result<usize> {
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
.map_err(backend_err)?;
let cap = cap as i64;
if count <= cap {
return Ok(0);
}
let excess = count - cap;
let deleted = conn
.execute(
"DELETE FROM events WHERE id IN \
(SELECT id FROM events ORDER BY id ASC LIMIT ?1)",
params![excess],
)
.map_err(backend_err)?;
Ok(deleted)
}
pub struct EventBuffer {
conn: Mutex<Connection>,
cap: usize,
}
impl EventBuffer {
pub fn new(path: impl AsRef<Path>, cap: usize) -> Result<Self> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.map_err(|e| StorageError::Backend(format!("create buffer directory {}: {e}", parent.display())))?;
}
}
let conn = Connection::open(path).map_err(backend_err)?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload BLOB NOT NULL,
enqueued_at INTEGER NOT NULL
);",
)
.map_err(backend_err)?;
Ok(Self {
conn: Mutex::new(conn),
cap,
})
}
pub fn from_config(config: &crate::SqliteBufferConfig) -> Result<Self> {
Self::new(&config.path, config.cap)
}
#[must_use]
pub fn cap(&self) -> usize {
self.cap
}
pub fn journal_mode(&self) -> Result<String> {
let conn = self.conn.lock().expect("event buffer mutex poisoned");
conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))
.map_err(backend_err)
}
pub fn synchronous(&self) -> Result<i64> {
let conn = self.conn.lock().expect("event buffer mutex poisoned");
conn.query_row("PRAGMA synchronous", [], |row| row.get(0))
.map_err(backend_err)
}
pub fn len(&self) -> Result<usize> {
let conn = self.conn.lock().expect("event buffer mutex poisoned");
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
.map_err(backend_err)?;
Ok(count as usize)
}
pub fn is_empty(&self) -> Result<bool> {
Ok(self.len()? == 0)
}
pub fn enqueue(&self, event: &AuditEntry) -> Result<()> {
let payload = serde_json::to_vec(event).map_err(|e| StorageError::Serialization(e.to_string()))?;
let enqueued_at = now_unix_nanos();
let conn = self.conn.lock().expect("event buffer mutex poisoned");
conn.execute(
"INSERT INTO events (payload, enqueued_at) VALUES (?1, ?2)",
params![payload, enqueued_at],
)
.map_err(backend_err)?;
metrics::counter!(crate::METRIC_EVENTS_BUFFERED).increment(1);
let dropped = prune_to_cap(&conn, self.cap)?;
if dropped > 0 {
metrics::counter!(crate::METRIC_EVENTS_DROPPED).increment(dropped as u64);
}
Ok(())
}
pub async fn drain_and_send(&self, sink: &dyn AuditSink) -> Result<usize> {
let mut flushed = 0usize;
while let Some((id, payload)) = self.peek_oldest()? {
let entry: AuditEntry =
serde_json::from_slice(&payload).map_err(|e| StorageError::Serialization(e.to_string()))?;
if sink.emit(entry).await.is_err() {
break;
}
self.delete(id)?;
flushed += 1;
metrics::counter!(crate::METRIC_EVENTS_FLUSHED).increment(1);
}
Ok(flushed)
}
fn peek_oldest(&self) -> Result<Option<(i64, Vec<u8>)>> {
let conn = self.conn.lock().expect("event buffer mutex poisoned");
conn.query_row("SELECT id, payload FROM events ORDER BY id ASC LIMIT 1", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.optional()
.map_err(backend_err)
}
fn delete(&self, id: i64) -> Result<()> {
let conn = self.conn.lock().expect("event buffer mutex poisoned");
conn.execute("DELETE FROM events WHERE id = ?1", params![id])
.map_err(backend_err)?;
Ok(())
}
}