Skip to main content

aa_storage_sqlite_buffer/
buffer.rs

1//! The on-disk SQLite event buffer.
2
3use std::path::Path;
4use std::sync::Mutex;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use aa_core::storage::{AuditEntry, AuditSink, Result, StorageError};
8use rusqlite::{params, Connection, OptionalExtension};
9
10/// Map a `rusqlite` error onto the storage contract's backend variant.
11fn backend_err(err: rusqlite::Error) -> StorageError {
12    StorageError::Backend(err.to_string())
13}
14
15/// Nanoseconds since the Unix epoch, saturating to `0` if the clock predates it.
16fn now_unix_nanos() -> i64 {
17    SystemTime::now()
18        .duration_since(UNIX_EPOCH)
19        .map(|d| d.as_nanos() as i64)
20        .unwrap_or(0)
21}
22
23/// Delete the oldest events until at most `cap` remain, returning the count
24/// removed.
25fn prune_to_cap(conn: &Connection, cap: usize) -> Result<usize> {
26    let count: i64 = conn
27        .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
28        .map_err(backend_err)?;
29    let cap = cap as i64;
30    if count <= cap {
31        return Ok(0);
32    }
33    let excess = count - cap;
34    let deleted = conn
35        .execute(
36            "DELETE FROM events WHERE id IN \
37             (SELECT id FROM events ORDER BY id ASC LIMIT ?1)",
38            params![excess],
39        )
40        .map_err(backend_err)?;
41    Ok(deleted)
42}
43
44/// A restart-safe, FIFO event buffer backed by a single SQLite file.
45///
46/// Events are appended with [`enqueue`](EventBuffer::enqueue) while the upstream
47/// sink is unreachable and replayed in insertion order with
48/// [`drain_and_send`](EventBuffer::drain_and_send) once it recovers. The file is
49/// opened in WAL mode so buffered events survive a process restart.
50pub struct EventBuffer {
51    conn: Mutex<Connection>,
52    cap: usize,
53}
54
55impl EventBuffer {
56    /// Open (creating if absent) a buffer at `path` retaining at most `cap`
57    /// events.
58    ///
59    /// Enables WAL journaling and `synchronous = NORMAL` for a durability/perf
60    /// balance, and creates the `events` table if it does not yet exist. Parent
61    /// directories are created as needed.
62    pub fn new(path: impl AsRef<Path>, cap: usize) -> Result<Self> {
63        let path = path.as_ref();
64        if let Some(parent) = path.parent() {
65            if !parent.as_os_str().is_empty() {
66                std::fs::create_dir_all(parent)
67                    .map_err(|e| StorageError::Backend(format!("create buffer directory {}: {e}", parent.display())))?;
68            }
69        }
70        let conn = Connection::open(path).map_err(backend_err)?;
71        conn.execute_batch(
72            "PRAGMA journal_mode = WAL;
73             PRAGMA synchronous = NORMAL;
74             CREATE TABLE IF NOT EXISTS events (
75                 id          INTEGER PRIMARY KEY AUTOINCREMENT,
76                 payload     BLOB    NOT NULL,
77                 enqueued_at INTEGER NOT NULL
78             );",
79        )
80        .map_err(backend_err)?;
81        Ok(Self {
82            conn: Mutex::new(conn),
83            cap,
84        })
85    }
86
87    /// Open a buffer from operator [`SqliteBufferConfig`](crate::SqliteBufferConfig).
88    pub fn from_config(config: &crate::SqliteBufferConfig) -> Result<Self> {
89        Self::new(&config.path, config.cap)
90    }
91
92    /// The maximum number of events this buffer retains before eviction.
93    #[must_use]
94    pub fn cap(&self) -> usize {
95        self.cap
96    }
97
98    /// The SQLite `journal_mode` in force on this buffer's connection
99    /// (`"wal"` once opened).
100    pub fn journal_mode(&self) -> Result<String> {
101        let conn = self.conn.lock().expect("event buffer mutex poisoned");
102        conn.query_row("PRAGMA journal_mode", [], |row| row.get(0))
103            .map_err(backend_err)
104    }
105
106    /// The SQLite `synchronous` level in force on this buffer's connection
107    /// (`1` = `NORMAL`).
108    pub fn synchronous(&self) -> Result<i64> {
109        let conn = self.conn.lock().expect("event buffer mutex poisoned");
110        conn.query_row("PRAGMA synchronous", [], |row| row.get(0))
111            .map_err(backend_err)
112    }
113
114    /// Number of events currently buffered.
115    pub fn len(&self) -> Result<usize> {
116        let conn = self.conn.lock().expect("event buffer mutex poisoned");
117        let count: i64 = conn
118            .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0))
119            .map_err(backend_err)?;
120        Ok(count as usize)
121    }
122
123    /// Whether the buffer currently holds no events.
124    pub fn is_empty(&self) -> Result<bool> {
125        Ok(self.len()? == 0)
126    }
127
128    /// Append `event` to the buffer.
129    ///
130    /// The entry is serialized to JSON and stored as a BLOB. When the buffer
131    /// would exceed its cap, the oldest events are evicted to make room and the
132    /// [`METRIC_EVENTS_DROPPED`](crate::METRIC_EVENTS_DROPPED) counter is bumped
133    /// by the number dropped — the loss is metered, never silent. Each accepted
134    /// event bumps [`METRIC_EVENTS_BUFFERED`](crate::METRIC_EVENTS_BUFFERED).
135    pub fn enqueue(&self, event: &AuditEntry) -> Result<()> {
136        let payload = serde_json::to_vec(event).map_err(|e| StorageError::Serialization(e.to_string()))?;
137        let enqueued_at = now_unix_nanos();
138        let conn = self.conn.lock().expect("event buffer mutex poisoned");
139        conn.execute(
140            "INSERT INTO events (payload, enqueued_at) VALUES (?1, ?2)",
141            params![payload, enqueued_at],
142        )
143        .map_err(backend_err)?;
144        metrics::counter!(crate::METRIC_EVENTS_BUFFERED).increment(1);
145
146        let dropped = prune_to_cap(&conn, self.cap)?;
147        if dropped > 0 {
148            metrics::counter!(crate::METRIC_EVENTS_DROPPED).increment(dropped as u64);
149        }
150        Ok(())
151    }
152
153    /// Replay buffered events to `sink` in insertion (FIFO) order.
154    ///
155    /// Each event is sent and, only after the sink acknowledges it, deleted from
156    /// the buffer — so a crash mid-flush replays at-least-once rather than
157    /// losing data. Draining stops at the first sink failure (the upstream is
158    /// treated as still-unreachable), leaving the remaining events buffered for
159    /// a later retry. Returns the number of events flushed; each one bumps
160    /// [`METRIC_EVENTS_FLUSHED`](crate::METRIC_EVENTS_FLUSHED).
161    pub async fn drain_and_send(&self, sink: &dyn AuditSink) -> Result<usize> {
162        let mut flushed = 0usize;
163        while let Some((id, payload)) = self.peek_oldest()? {
164            let entry: AuditEntry =
165                serde_json::from_slice(&payload).map_err(|e| StorageError::Serialization(e.to_string()))?;
166            if sink.emit(entry).await.is_err() {
167                break;
168            }
169            self.delete(id)?;
170            flushed += 1;
171            metrics::counter!(crate::METRIC_EVENTS_FLUSHED).increment(1);
172        }
173        Ok(flushed)
174    }
175
176    /// Fetch the oldest buffered event as `(id, payload)`, if any.
177    fn peek_oldest(&self) -> Result<Option<(i64, Vec<u8>)>> {
178        let conn = self.conn.lock().expect("event buffer mutex poisoned");
179        conn.query_row("SELECT id, payload FROM events ORDER BY id ASC LIMIT 1", [], |row| {
180            Ok((row.get(0)?, row.get(1)?))
181        })
182        .optional()
183        .map_err(backend_err)
184    }
185
186    /// Delete the buffered event with the given row id.
187    fn delete(&self, id: i64) -> Result<()> {
188        let conn = self.conn.lock().expect("event buffer mutex poisoned");
189        conn.execute("DELETE FROM events WHERE id = ?1", params![id])
190            .map_err(backend_err)?;
191        Ok(())
192    }
193}