Skip to main content

awsim_cloudwatch_logs/
sqlite_store.rs

1//! SQLite-backed log-event storage. The previous in-memory `Vec<LogEvent>`
2//! per stream had no retention enforcement and triggered an O(n log n)
3//! sort on every `PutLogEvents` — fine for a smoke test, deadly under
4//! any sustained logging workload.
5//!
6//! Layout: a single `log_events` table partitioned by (account, region,
7//! log_group, log_stream). All queries use the composite index, so reads
8//! stay cheap even at hundreds of thousands of events. Retention is
9//! enforced by a single `DELETE WHERE ts < ?` per group.
10
11use std::path::PathBuf;
12use std::sync::Arc;
13
14use r2d2::PooledConnection;
15use r2d2_sqlite::SqliteConnectionManager;
16use rusqlite::{Connection, OptionalExtension, params};
17
18use awsim_core::AwsError;
19
20const POOL_MAX: u32 = 4;
21const POOL_MIN_IDLE: u32 = 1;
22const CACHE_SIZE_KIB: i64 = -2 * 1024;
23const MMAP_SIZE_BYTES: i64 = 16 * 1024 * 1024;
24const WAL_AUTOCHECKPOINT_PAGES: i64 = 256;
25
26type Pool = r2d2::Pool<SqliteConnectionManager>;
27type Conn = PooledConnection<SqliteConnectionManager>;
28
29/// SQLite-backed store for CloudWatch Logs events. Cheap to clone —
30/// internals are an Arc'd r2d2 pool.
31#[derive(Clone, Debug)]
32pub struct SqliteStore {
33    inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38    db_path: PathBuf,
39    #[allow(dead_code)] // pool's Debug is what we care about
40    pool: Pool,
41}
42
43#[derive(Debug, Clone)]
44pub struct LogEventRow {
45    pub timestamp: u64,
46    pub message: String,
47    pub ingestion_time: u64,
48}
49
50impl SqliteStore {
51    pub fn open(path: impl Into<PathBuf>) -> Result<Self, AwsError> {
52        let db_path = path.into();
53        let manager = SqliteConnectionManager::file(&db_path).with_init(apply_pragmas);
54        let pool = r2d2::Pool::builder()
55            .max_size(POOL_MAX)
56            .min_idle(Some(POOL_MIN_IDLE))
57            .build(manager)
58            .map_err(|e| AwsError::internal(format!("CWL pool init failed: {e}")))?;
59        // Run migrations (just one for now).
60        {
61            let conn = pool
62                .get()
63                .map_err(|e| AwsError::internal(format!("CWL pool acquire failed: {e}")))?;
64            init_schema(&conn)?;
65        }
66        Ok(Self {
67            inner: Arc::new(Inner { db_path, pool }),
68        })
69    }
70
71    pub fn db_path(&self) -> &std::path::Path {
72        &self.inner.db_path
73    }
74
75    fn conn(&self) -> Result<Conn, AwsError> {
76        self.inner
77            .pool
78            .get()
79            .map_err(|e| AwsError::internal(format!("CWL pool acquire failed: {e}")))
80    }
81
82    /// Bulk-insert log events. Returns the number of events written.
83    pub fn put_events(
84        &self,
85        account: &str,
86        region: &str,
87        log_group: &str,
88        log_stream: &str,
89        events: &[LogEventRow],
90    ) -> Result<usize, AwsError> {
91        if events.is_empty() {
92            return Ok(0);
93        }
94        let mut conn = self.conn()?;
95        let tx = conn.transaction().map_err(sqlite_err)?;
96        {
97            let mut stmt = tx
98                .prepare(
99                    "INSERT INTO log_events
100                     (account, region, log_group, log_stream, ts, ingestion_ts, message)
101                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
102                )
103                .map_err(sqlite_err)?;
104            for e in events {
105                stmt.execute(params![
106                    account,
107                    region,
108                    log_group,
109                    log_stream,
110                    e.timestamp as i64,
111                    e.ingestion_time as i64,
112                    &e.message,
113                ])
114                .map_err(sqlite_err)?;
115            }
116        }
117        tx.commit().map_err(sqlite_err)?;
118        Ok(events.len())
119    }
120
121    /// Range-query log events for a single stream. `start` / `end` in ms.
122    /// Pagination via (timestamp, rowid) tuple → caller passes the
123    /// last-seen rowid as `after_rowid` to resume.
124    #[allow(clippy::too_many_arguments)]
125    pub fn get_events(
126        &self,
127        account: &str,
128        region: &str,
129        log_group: &str,
130        log_stream: &str,
131        start: Option<u64>,
132        end: Option<u64>,
133        offset: usize,
134        limit: usize,
135        ascending: bool,
136    ) -> Result<Vec<LogEventRow>, AwsError> {
137        let conn = self.conn()?;
138        let order = if ascending { "ASC" } else { "DESC" };
139        let sql = format!(
140            "SELECT ts, ingestion_ts, message FROM log_events
141             WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4
142               AND (?5 IS NULL OR ts >= ?5)
143               AND (?6 IS NULL OR ts <= ?6)
144             ORDER BY ts {order}, rowid {order}
145             LIMIT ?7 OFFSET ?8"
146        );
147        let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
148        let start_param = start.map(|v| v as i64);
149        let end_param = end.map(|v| v as i64);
150        let rows = stmt
151            .query_map(
152                params![
153                    account,
154                    region,
155                    log_group,
156                    log_stream,
157                    start_param,
158                    end_param,
159                    limit as i64,
160                    offset as i64,
161                ],
162                |row| {
163                    Ok(LogEventRow {
164                        timestamp: row.get::<_, i64>(0)? as u64,
165                        ingestion_time: row.get::<_, i64>(1)? as u64,
166                        message: row.get::<_, String>(2)?,
167                    })
168                },
169            )
170            .map_err(sqlite_err)?;
171        let out: Result<Vec<_>, _> = rows.collect();
172        out.map_err(sqlite_err)
173    }
174
175    /// Total event count for a single stream — used to compute
176    /// pagination tokens that mirror the legacy index-based ones.
177    pub fn count_events(
178        &self,
179        account: &str,
180        region: &str,
181        log_group: &str,
182        log_stream: &str,
183        start: Option<u64>,
184        end: Option<u64>,
185    ) -> Result<usize, AwsError> {
186        let conn = self.conn()?;
187        let count: i64 = conn
188            .query_row(
189                "SELECT COUNT(*) FROM log_events
190                 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4
191                   AND (?5 IS NULL OR ts >= ?5)
192                   AND (?6 IS NULL OR ts <= ?6)",
193                params![
194                    account,
195                    region,
196                    log_group,
197                    log_stream,
198                    start.map(|v| v as i64),
199                    end.map(|v| v as i64),
200                ],
201                |r| r.get(0),
202            )
203            .map_err(sqlite_err)?;
204        Ok(count as usize)
205    }
206
207    /// Filter events across one or more streams by substring match
208    /// on `message`, returned in `(stream, ts)` order.
209    #[allow(clippy::too_many_arguments)]
210    pub fn filter_events(
211        &self,
212        account: &str,
213        region: &str,
214        log_group: &str,
215        stream_filter: Option<&[String]>,
216        substring: Option<&str>,
217        start: Option<u64>,
218        end: Option<u64>,
219        limit: usize,
220    ) -> Result<Vec<(String, LogEventRow)>, AwsError> {
221        let conn = self.conn()?;
222        let mut sql = String::from(
223            "SELECT log_stream, ts, ingestion_ts, message FROM log_events
224             WHERE account = ?1 AND region = ?2 AND log_group = ?3
225               AND (?4 IS NULL OR ts >= ?4)
226               AND (?5 IS NULL OR ts <= ?5)",
227        );
228        if let Some(s) = substring
229            && !s.is_empty()
230        {
231            sql.push_str(&format!(
232                " AND message LIKE '%' || {} || '%'",
233                escape_for_like(s)
234            ));
235        }
236        if let Some(streams) = stream_filter
237            && !streams.is_empty()
238        {
239            sql.push_str(" AND log_stream IN (");
240            for (i, s) in streams.iter().enumerate() {
241                if i > 0 {
242                    sql.push(',');
243                }
244                sql.push('\'');
245                sql.push_str(&s.replace('\'', "''"));
246                sql.push('\'');
247            }
248            sql.push(')');
249        }
250        sql.push_str(" ORDER BY ts ASC, rowid ASC LIMIT ?6");
251        let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
252        let start_param = start.map(|v| v as i64);
253        let end_param = end.map(|v| v as i64);
254        let rows = stmt
255            .query_map(
256                params![
257                    account,
258                    region,
259                    log_group,
260                    start_param,
261                    end_param,
262                    limit as i64,
263                ],
264                |row| {
265                    Ok((
266                        row.get::<_, String>(0)?,
267                        LogEventRow {
268                            timestamp: row.get::<_, i64>(1)? as u64,
269                            ingestion_time: row.get::<_, i64>(2)? as u64,
270                            message: row.get::<_, String>(3)?,
271                        },
272                    ))
273                },
274            )
275            .map_err(sqlite_err)?;
276        let out: Result<Vec<_>, _> = rows.collect();
277        out.map_err(sqlite_err)
278    }
279
280    /// First / last event timestamps for a stream — used to populate
281    /// the `firstEventTimestamp` / `lastEventTimestamp` fields on
282    /// DescribeLogStreams. Returns `(None, None)` when the stream is
283    /// empty.
284    pub fn stream_bounds(
285        &self,
286        account: &str,
287        region: &str,
288        log_group: &str,
289        log_stream: &str,
290    ) -> Result<(Option<u64>, Option<u64>), AwsError> {
291        let conn = self.conn()?;
292        let row: Option<(Option<i64>, Option<i64>)> = conn
293            .query_row(
294                "SELECT MIN(ts), MAX(ts) FROM log_events
295                 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4",
296                params![account, region, log_group, log_stream],
297                |r| Ok((r.get::<_, Option<i64>>(0)?, r.get::<_, Option<i64>>(1)?)),
298            )
299            .optional()
300            .map_err(sqlite_err)?;
301        Ok(row
302            .map(|(a, b)| (a.map(|v| v as u64), b.map(|v| v as u64)))
303            .unwrap_or((None, None)))
304    }
305
306    /// Delete events older than `cutoff_ts` (ms). Used by the
307    /// retention sweeper. Returns the number of rows deleted.
308    pub fn trim_older_than(
309        &self,
310        account: &str,
311        region: &str,
312        log_group: &str,
313        cutoff_ts: u64,
314    ) -> Result<usize, AwsError> {
315        let conn = self.conn()?;
316        let n = conn
317            .execute(
318                "DELETE FROM log_events
319                 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND ts < ?4",
320                params![account, region, log_group, cutoff_ts as i64],
321            )
322            .map_err(sqlite_err)?;
323        Ok(n)
324    }
325
326    /// Delete every event for a stream — used when DeleteLogStream
327    /// fires. Cheap: indexed lookup + bulk delete.
328    pub fn delete_stream(
329        &self,
330        account: &str,
331        region: &str,
332        log_group: &str,
333        log_stream: &str,
334    ) -> Result<usize, AwsError> {
335        let conn = self.conn()?;
336        let n = conn
337            .execute(
338                "DELETE FROM log_events
339                 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4",
340                params![account, region, log_group, log_stream],
341            )
342            .map_err(sqlite_err)?;
343        Ok(n)
344    }
345
346    /// Delete every event for a log group — used by DeleteLogGroup.
347    pub fn delete_group(
348        &self,
349        account: &str,
350        region: &str,
351        log_group: &str,
352    ) -> Result<usize, AwsError> {
353        let conn = self.conn()?;
354        let n = conn
355            .execute(
356                "DELETE FROM log_events
357                 WHERE account = ?1 AND region = ?2 AND log_group = ?3",
358                params![account, region, log_group],
359            )
360            .map_err(sqlite_err)?;
361        Ok(n)
362    }
363
364    /// Total row count across all groups + streams. Used by the
365    /// `/_awsim/storage` endpoint to surface in-memory growth.
366    pub fn total_rows(&self) -> Result<u64, AwsError> {
367        let conn = self.conn()?;
368        let n: i64 = conn
369            .query_row("SELECT COUNT(*) FROM log_events", [], |r| r.get(0))
370            .map_err(sqlite_err)?;
371        Ok(n as u64)
372    }
373}
374
375fn init_schema(conn: &Connection) -> Result<(), AwsError> {
376    conn.execute_batch(
377        "CREATE TABLE IF NOT EXISTS log_events (
378             account TEXT NOT NULL,
379             region TEXT NOT NULL,
380             log_group TEXT NOT NULL,
381             log_stream TEXT NOT NULL,
382             ts INTEGER NOT NULL,
383             ingestion_ts INTEGER NOT NULL,
384             message TEXT NOT NULL
385         );
386         CREATE INDEX IF NOT EXISTS log_events_lookup
387             ON log_events (account, region, log_group, log_stream, ts);
388         CREATE INDEX IF NOT EXISTS log_events_group_ts
389             ON log_events (account, region, log_group, ts);",
390    )
391    .map_err(sqlite_err)?;
392    Ok(())
393}
394
395fn apply_pragmas(conn: &mut rusqlite::Connection) -> Result<(), rusqlite::Error> {
396    conn.pragma_update(None, "journal_mode", "WAL")?;
397    conn.pragma_update(None, "synchronous", "NORMAL")?;
398    conn.execute_batch(&format!(
399        "PRAGMA temp_store = MEMORY;
400         PRAGMA mmap_size  = {MMAP_SIZE_BYTES};
401         PRAGMA cache_size = {CACHE_SIZE_KIB};
402         PRAGMA wal_autocheckpoint = {WAL_AUTOCHECKPOINT_PAGES};"
403    ))?;
404    Ok(())
405}
406
407/// Naïve quoted literal for substring matching. Wraps the user
408/// string in single quotes and escapes embedded quotes. We can't
409/// bind the LIKE pattern as a parameter directly because we want
410/// to control the wildcards.
411fn escape_for_like(s: &str) -> String {
412    let mut out = String::with_capacity(s.len() + 2);
413    out.push('\'');
414    for ch in s.chars() {
415        if ch == '\'' {
416            out.push_str("''");
417        } else {
418            out.push(ch);
419        }
420    }
421    out.push('\'');
422    out
423}
424
425fn sqlite_err(e: rusqlite::Error) -> AwsError {
426    AwsError::internal(format!("CloudWatch Logs sqlite error: {e}"))
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    fn store() -> SqliteStore {
434        let id = uuid::Uuid::new_v4();
435        let path = std::env::temp_dir().join(format!("awsim-cwl-test-{id}.db"));
436        SqliteStore::open(path).unwrap()
437    }
438
439    fn ev(ts: u64, msg: &str) -> LogEventRow {
440        LogEventRow {
441            timestamp: ts,
442            message: msg.to_string(),
443            ingestion_time: ts + 1,
444        }
445    }
446
447    #[test]
448    fn put_then_get_returns_in_ts_order() {
449        let s = store();
450        s.put_events("a", "r", "g", "stm", &[ev(3, "c"), ev(1, "a"), ev(2, "b")])
451            .unwrap();
452        let rows = s
453            .get_events("a", "r", "g", "stm", None, None, 0, 100, true)
454            .unwrap();
455        assert_eq!(rows.len(), 3);
456        assert_eq!(rows[0].timestamp, 1);
457        assert_eq!(rows[2].timestamp, 3);
458    }
459
460    #[test]
461    fn time_range_filter() {
462        let s = store();
463        s.put_events("a", "r", "g", "stm", &[ev(1, "a"), ev(5, "b"), ev(10, "c")])
464            .unwrap();
465        let rows = s
466            .get_events("a", "r", "g", "stm", Some(2), Some(7), 0, 100, true)
467            .unwrap();
468        assert_eq!(rows.len(), 1);
469        assert_eq!(rows[0].timestamp, 5);
470    }
471
472    #[test]
473    fn filter_substring_across_streams() {
474        let s = store();
475        s.put_events("a", "r", "g", "s1", &[ev(1, "hello world")])
476            .unwrap();
477        s.put_events(
478            "a",
479            "r",
480            "g",
481            "s2",
482            &[ev(2, "no match"), ev(3, "world cup")],
483        )
484        .unwrap();
485        let rows = s
486            .filter_events("a", "r", "g", None, Some("world"), None, None, 100)
487            .unwrap();
488        assert_eq!(rows.len(), 2);
489        assert_eq!(rows[0].1.timestamp, 1);
490        assert_eq!(rows[1].1.timestamp, 3);
491    }
492
493    #[test]
494    fn trim_older_than_drops_events() {
495        let s = store();
496        s.put_events("a", "r", "g", "stm", &[ev(1, "a"), ev(5, "b"), ev(10, "c")])
497            .unwrap();
498        let removed = s.trim_older_than("a", "r", "g", 5).unwrap();
499        assert_eq!(removed, 1);
500        let remaining = s.count_events("a", "r", "g", "stm", None, None).unwrap();
501        assert_eq!(remaining, 2);
502    }
503
504    #[test]
505    fn stream_bounds_returns_min_max() {
506        let s = store();
507        s.put_events("a", "r", "g", "stm", &[ev(5, "x"), ev(10, "y"), ev(2, "z")])
508            .unwrap();
509        let (min, max) = s.stream_bounds("a", "r", "g", "stm").unwrap();
510        assert_eq!(min, Some(2));
511        assert_eq!(max, Some(10));
512    }
513
514    #[test]
515    fn delete_stream_removes_only_that_stream() {
516        let s = store();
517        s.put_events("a", "r", "g", "s1", &[ev(1, "a")]).unwrap();
518        s.put_events("a", "r", "g", "s2", &[ev(1, "b")]).unwrap();
519        s.delete_stream("a", "r", "g", "s1").unwrap();
520        assert_eq!(s.count_events("a", "r", "g", "s1", None, None).unwrap(), 0);
521        assert_eq!(s.count_events("a", "r", "g", "s2", None, None).unwrap(), 1);
522    }
523}