aa_storage_sqlite_buffer/
buffer.rs1use std::path::Path;
4use std::sync::Mutex;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use aa_core::storage::{AuditEntry, AuditSink, Result, StorageError};
8use rusqlite::{params, Connection, OptionalExtension};
9
10fn backend_err(err: rusqlite::Error) -> StorageError {
12 StorageError::Backend(err.to_string())
13}
14
15fn now_unix_nanos() -> i64 {
17 SystemTime::now()
18 .duration_since(UNIX_EPOCH)
19 .map(|d| d.as_nanos() as i64)
20 .unwrap_or(0)
21}
22
23fn prune_to_cap(conn: &Connection, cap: usize) -> Result<usize> {
26 let count: i64 = conn
27 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
28 .map_err(backend_err)?;
29 let cap = cap as i64;
30 if count <= cap {
31 return Ok(0);
32 }
33 let excess = count - cap;
34 let deleted = conn
35 .execute(
36 "DELETE FROM events WHERE id IN \
37 (SELECT id FROM events ORDER BY id ASC LIMIT ?1)",
38 params![excess],
39 )
40 .map_err(backend_err)?;
41 Ok(deleted)
42}
43
44pub struct EventBuffer {
51 conn: Mutex<Connection>,
52 cap: usize,
53}
54
55impl EventBuffer {
56 pub fn new(path: impl AsRef<Path>, cap: usize) -> Result<Self> {
63 let path = path.as_ref();
64 if let Some(parent) = path.parent() {
65 if !parent.as_os_str().is_empty() {
66 std::fs::create_dir_all(parent)
67 .map_err(|e| StorageError::Backend(format!("create buffer directory {}: {e}", parent.display())))?;
68 }
69 }
70 let conn = Connection::open(path).map_err(backend_err)?;
71 conn.execute_batch(
72 "PRAGMA journal_mode = WAL;
73 PRAGMA synchronous = NORMAL;
74 CREATE TABLE IF NOT EXISTS events (
75 id INTEGER PRIMARY KEY AUTOINCREMENT,
76 payload BLOB NOT NULL,
77 enqueued_at INTEGER NOT NULL
78 );",
79 )
80 .map_err(backend_err)?;
81 Ok(Self {
82 conn: Mutex::new(conn),
83 cap,
84 })
85 }
86
87 pub fn from_config(config: &crate::SqliteBufferConfig) -> Result<Self> {
89 Self::new(&config.path, config.cap)
90 }
91
92 #[must_use]
94 pub fn cap(&self) -> usize {
95 self.cap
96 }
97
98 pub fn journal_mode(&self) -> Result<String> {
101 let conn = self.conn.lock().expect("event buffer mutex poisoned");
102 conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))
103 .map_err(backend_err)
104 }
105
106 pub fn synchronous(&self) -> Result<i64> {
109 let conn = self.conn.lock().expect("event buffer mutex poisoned");
110 conn.query_row("PRAGMA synchronous", [], |row| row.get(0))
111 .map_err(backend_err)
112 }
113
114 pub fn len(&self) -> Result<usize> {
116 let conn = self.conn.lock().expect("event buffer mutex poisoned");
117 let count: i64 = conn
118 .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
119 .map_err(backend_err)?;
120 Ok(count as usize)
121 }
122
123 pub fn is_empty(&self) -> Result<bool> {
125 Ok(self.len()? == 0)
126 }
127
128 pub fn enqueue(&self, event: &AuditEntry) -> Result<()> {
136 let payload = serde_json::to_vec(event).map_err(|e| StorageError::Serialization(e.to_string()))?;
137 let enqueued_at = now_unix_nanos();
138 let conn = self.conn.lock().expect("event buffer mutex poisoned");
139 conn.execute(
140 "INSERT INTO events (payload, enqueued_at) VALUES (?1, ?2)",
141 params![payload, enqueued_at],
142 )
143 .map_err(backend_err)?;
144 metrics::counter!(crate::METRIC_EVENTS_BUFFERED).increment(1);
145
146 let dropped = prune_to_cap(&conn, self.cap)?;
147 if dropped > 0 {
148 metrics::counter!(crate::METRIC_EVENTS_DROPPED).increment(dropped as u64);
149 }
150 Ok(())
151 }
152
153 pub async fn drain_and_send(&self, sink: &dyn AuditSink) -> Result<usize> {
162 let mut flushed = 0usize;
163 while let Some((id, payload)) = self.peek_oldest()? {
164 let entry: AuditEntry =
165 serde_json::from_slice(&payload).map_err(|e| StorageError::Serialization(e.to_string()))?;
166 if sink.emit(entry).await.is_err() {
167 break;
168 }
169 self.delete(id)?;
170 flushed += 1;
171 metrics::counter!(crate::METRIC_EVENTS_FLUSHED).increment(1);
172 }
173 Ok(flushed)
174 }
175
176 fn peek_oldest(&self) -> Result<Option<(i64, Vec<u8>)>> {
178 let conn = self.conn.lock().expect("event buffer mutex poisoned");
179 conn.query_row("SELECT id, payload FROM events ORDER BY id ASC LIMIT 1", [], |row| {
180 Ok((row.get(0)?, row.get(1)?))
181 })
182 .optional()
183 .map_err(backend_err)
184 }
185
186 fn delete(&self, id: i64) -> Result<()> {
188 let conn = self.conn.lock().expect("event buffer mutex poisoned");
189 conn.execute("DELETE FROM events WHERE id = ?1", params![id])
190 .map_err(backend_err)?;
191 Ok(())
192 }
193}