1use std::path::PathBuf;
12use std::sync::Arc;
13
14use r2d2::PooledConnection;
15use r2d2_sqlite::SqliteConnectionManager;
16use rusqlite::{Connection, OptionalExtension, params};
17
18use awsim_core::AwsError;
19
20const POOL_MAX: u32 = 4;
21const POOL_MIN_IDLE: u32 = 1;
22const CACHE_SIZE_KIB: i64 = -2 * 1024;
23const MMAP_SIZE_BYTES: i64 = 16 * 1024 * 1024;
24const WAL_AUTOCHECKPOINT_PAGES: i64 = 256;
25
26type Pool = r2d2::Pool<SqliteConnectionManager>;
27type Conn = PooledConnection<SqliteConnectionManager>;
28
29#[derive(Clone, Debug)]
32pub struct SqliteStore {
33 inner: Arc<Inner>,
34}
35
36#[derive(Debug)]
37struct Inner {
38 db_path: PathBuf,
39 #[allow(dead_code)] pool: Pool,
41}
42
43#[derive(Debug, Clone)]
44pub struct LogEventRow {
45 pub timestamp: u64,
46 pub message: String,
47 pub ingestion_time: u64,
48}
49
50impl SqliteStore {
51 pub fn open(path: impl Into<PathBuf>) -> Result<Self, AwsError> {
52 let db_path = path.into();
53 let manager = SqliteConnectionManager::file(&db_path).with_init(apply_pragmas);
54 let pool = r2d2::Pool::builder()
55 .max_size(POOL_MAX)
56 .min_idle(Some(POOL_MIN_IDLE))
57 .build(manager)
58 .map_err(|e| AwsError::internal(format!("CWL pool init failed: {e}")))?;
59 {
61 let conn = pool
62 .get()
63 .map_err(|e| AwsError::internal(format!("CWL pool acquire failed: {e}")))?;
64 init_schema(&conn)?;
65 }
66 Ok(Self {
67 inner: Arc::new(Inner { db_path, pool }),
68 })
69 }
70
71 pub fn db_path(&self) -> &std::path::Path {
72 &self.inner.db_path
73 }
74
75 fn conn(&self) -> Result<Conn, AwsError> {
76 self.inner
77 .pool
78 .get()
79 .map_err(|e| AwsError::internal(format!("CWL pool acquire failed: {e}")))
80 }
81
82 pub fn put_events(
84 &self,
85 account: &str,
86 region: &str,
87 log_group: &str,
88 log_stream: &str,
89 events: &[LogEventRow],
90 ) -> Result<usize, AwsError> {
91 if events.is_empty() {
92 return Ok(0);
93 }
94 let mut conn = self.conn()?;
95 let tx = conn.transaction().map_err(sqlite_err)?;
96 {
97 let mut stmt = tx
98 .prepare(
99 "INSERT INTO log_events
100 (account, region, log_group, log_stream, ts, ingestion_ts, message)
101 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
102 )
103 .map_err(sqlite_err)?;
104 for e in events {
105 stmt.execute(params![
106 account,
107 region,
108 log_group,
109 log_stream,
110 e.timestamp as i64,
111 e.ingestion_time as i64,
112 &e.message,
113 ])
114 .map_err(sqlite_err)?;
115 }
116 }
117 tx.commit().map_err(sqlite_err)?;
118 Ok(events.len())
119 }
120
121 #[allow(clippy::too_many_arguments)]
125 pub fn get_events(
126 &self,
127 account: &str,
128 region: &str,
129 log_group: &str,
130 log_stream: &str,
131 start: Option<u64>,
132 end: Option<u64>,
133 offset: usize,
134 limit: usize,
135 ascending: bool,
136 ) -> Result<Vec<LogEventRow>, AwsError> {
137 let conn = self.conn()?;
138 let order = if ascending { "ASC" } else { "DESC" };
139 let sql = format!(
140 "SELECT ts, ingestion_ts, message FROM log_events
141 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4
142 AND (?5 IS NULL OR ts >= ?5)
143 AND (?6 IS NULL OR ts <= ?6)
144 ORDER BY ts {order}, rowid {order}
145 LIMIT ?7 OFFSET ?8"
146 );
147 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
148 let start_param = start.map(|v| v as i64);
149 let end_param = end.map(|v| v as i64);
150 let rows = stmt
151 .query_map(
152 params![
153 account,
154 region,
155 log_group,
156 log_stream,
157 start_param,
158 end_param,
159 limit as i64,
160 offset as i64,
161 ],
162 |row| {
163 Ok(LogEventRow {
164 timestamp: row.get::<_, i64>(0)? as u64,
165 ingestion_time: row.get::<_, i64>(1)? as u64,
166 message: row.get::<_, String>(2)?,
167 })
168 },
169 )
170 .map_err(sqlite_err)?;
171 let out: Result<Vec<_>, _> = rows.collect();
172 out.map_err(sqlite_err)
173 }
174
175 pub fn count_events(
178 &self,
179 account: &str,
180 region: &str,
181 log_group: &str,
182 log_stream: &str,
183 start: Option<u64>,
184 end: Option<u64>,
185 ) -> Result<usize, AwsError> {
186 let conn = self.conn()?;
187 let count: i64 = conn
188 .query_row(
189 "SELECT COUNT(*) FROM log_events
190 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4
191 AND (?5 IS NULL OR ts >= ?5)
192 AND (?6 IS NULL OR ts <= ?6)",
193 params![
194 account,
195 region,
196 log_group,
197 log_stream,
198 start.map(|v| v as i64),
199 end.map(|v| v as i64),
200 ],
201 |r| r.get(0),
202 )
203 .map_err(sqlite_err)?;
204 Ok(count as usize)
205 }
206
207 #[allow(clippy::too_many_arguments)]
210 pub fn filter_events(
211 &self,
212 account: &str,
213 region: &str,
214 log_group: &str,
215 stream_filter: Option<&[String]>,
216 substring: Option<&str>,
217 start: Option<u64>,
218 end: Option<u64>,
219 limit: usize,
220 ) -> Result<Vec<(String, LogEventRow)>, AwsError> {
221 let conn = self.conn()?;
222 let mut sql = String::from(
223 "SELECT log_stream, ts, ingestion_ts, message FROM log_events
224 WHERE account = ?1 AND region = ?2 AND log_group = ?3
225 AND (?4 IS NULL OR ts >= ?4)
226 AND (?5 IS NULL OR ts <= ?5)",
227 );
228 if let Some(s) = substring
229 && !s.is_empty()
230 {
231 sql.push_str(&format!(
232 " AND message LIKE '%' || {} || '%'",
233 escape_for_like(s)
234 ));
235 }
236 if let Some(streams) = stream_filter
237 && !streams.is_empty()
238 {
239 sql.push_str(" AND log_stream IN (");
240 for (i, s) in streams.iter().enumerate() {
241 if i > 0 {
242 sql.push(',');
243 }
244 sql.push('\'');
245 sql.push_str(&s.replace('\'', "''"));
246 sql.push('\'');
247 }
248 sql.push(')');
249 }
250 sql.push_str(" ORDER BY ts ASC, rowid ASC LIMIT ?6");
251 let mut stmt = conn.prepare(&sql).map_err(sqlite_err)?;
252 let start_param = start.map(|v| v as i64);
253 let end_param = end.map(|v| v as i64);
254 let rows = stmt
255 .query_map(
256 params![
257 account,
258 region,
259 log_group,
260 start_param,
261 end_param,
262 limit as i64,
263 ],
264 |row| {
265 Ok((
266 row.get::<_, String>(0)?,
267 LogEventRow {
268 timestamp: row.get::<_, i64>(1)? as u64,
269 ingestion_time: row.get::<_, i64>(2)? as u64,
270 message: row.get::<_, String>(3)?,
271 },
272 ))
273 },
274 )
275 .map_err(sqlite_err)?;
276 let out: Result<Vec<_>, _> = rows.collect();
277 out.map_err(sqlite_err)
278 }
279
280 pub fn stream_bounds(
285 &self,
286 account: &str,
287 region: &str,
288 log_group: &str,
289 log_stream: &str,
290 ) -> Result<(Option<u64>, Option<u64>), AwsError> {
291 let conn = self.conn()?;
292 let row: Option<(Option<i64>, Option<i64>)> = conn
293 .query_row(
294 "SELECT MIN(ts), MAX(ts) FROM log_events
295 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4",
296 params![account, region, log_group, log_stream],
297 |r| Ok((r.get::<_, Option<i64>>(0)?, r.get::<_, Option<i64>>(1)?)),
298 )
299 .optional()
300 .map_err(sqlite_err)?;
301 Ok(row
302 .map(|(a, b)| (a.map(|v| v as u64), b.map(|v| v as u64)))
303 .unwrap_or((None, None)))
304 }
305
306 pub fn trim_older_than(
309 &self,
310 account: &str,
311 region: &str,
312 log_group: &str,
313 cutoff_ts: u64,
314 ) -> Result<usize, AwsError> {
315 let conn = self.conn()?;
316 let n = conn
317 .execute(
318 "DELETE FROM log_events
319 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND ts < ?4",
320 params![account, region, log_group, cutoff_ts as i64],
321 )
322 .map_err(sqlite_err)?;
323 Ok(n)
324 }
325
326 pub fn delete_stream(
329 &self,
330 account: &str,
331 region: &str,
332 log_group: &str,
333 log_stream: &str,
334 ) -> Result<usize, AwsError> {
335 let conn = self.conn()?;
336 let n = conn
337 .execute(
338 "DELETE FROM log_events
339 WHERE account = ?1 AND region = ?2 AND log_group = ?3 AND log_stream = ?4",
340 params![account, region, log_group, log_stream],
341 )
342 .map_err(sqlite_err)?;
343 Ok(n)
344 }
345
346 pub fn delete_group(
348 &self,
349 account: &str,
350 region: &str,
351 log_group: &str,
352 ) -> Result<usize, AwsError> {
353 let conn = self.conn()?;
354 let n = conn
355 .execute(
356 "DELETE FROM log_events
357 WHERE account = ?1 AND region = ?2 AND log_group = ?3",
358 params![account, region, log_group],
359 )
360 .map_err(sqlite_err)?;
361 Ok(n)
362 }
363
364 pub fn total_rows(&self) -> Result<u64, AwsError> {
367 let conn = self.conn()?;
368 let n: i64 = conn
369 .query_row("SELECT COUNT(*) FROM log_events", [], |r| r.get(0))
370 .map_err(sqlite_err)?;
371 Ok(n as u64)
372 }
373}
374
375fn init_schema(conn: &Connection) -> Result<(), AwsError> {
376 conn.execute_batch(
377 "CREATE TABLE IF NOT EXISTS log_events (
378 account TEXT NOT NULL,
379 region TEXT NOT NULL,
380 log_group TEXT NOT NULL,
381 log_stream TEXT NOT NULL,
382 ts INTEGER NOT NULL,
383 ingestion_ts INTEGER NOT NULL,
384 message TEXT NOT NULL
385 );
386 CREATE INDEX IF NOT EXISTS log_events_lookup
387 ON log_events (account, region, log_group, log_stream, ts);
388 CREATE INDEX IF NOT EXISTS log_events_group_ts
389 ON log_events (account, region, log_group, ts);",
390 )
391 .map_err(sqlite_err)?;
392 Ok(())
393}
394
395fn apply_pragmas(conn: &mut rusqlite::Connection) -> Result<(), rusqlite::Error> {
396 conn.pragma_update(None, "journal_mode", "WAL")?;
397 conn.pragma_update(None, "synchronous", "NORMAL")?;
398 conn.execute_batch(&format!(
399 "PRAGMA temp_store = MEMORY;
400 PRAGMA mmap_size = {MMAP_SIZE_BYTES};
401 PRAGMA cache_size = {CACHE_SIZE_KIB};
402 PRAGMA wal_autocheckpoint = {WAL_AUTOCHECKPOINT_PAGES};"
403 ))?;
404 Ok(())
405}
406
407fn escape_for_like(s: &str) -> String {
412 let mut out = String::with_capacity(s.len() + 2);
413 out.push('\'');
414 for ch in s.chars() {
415 if ch == '\'' {
416 out.push_str("''");
417 } else {
418 out.push(ch);
419 }
420 }
421 out.push('\'');
422 out
423}
424
425fn sqlite_err(e: rusqlite::Error) -> AwsError {
426 AwsError::internal(format!("CloudWatch Logs sqlite error: {e}"))
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 fn store() -> SqliteStore {
434 let id = uuid::Uuid::new_v4();
435 let path = std::env::temp_dir().join(format!("awsim-cwl-test-{id}.db"));
436 SqliteStore::open(path).unwrap()
437 }
438
439 fn ev(ts: u64, msg: &str) -> LogEventRow {
440 LogEventRow {
441 timestamp: ts,
442 message: msg.to_string(),
443 ingestion_time: ts + 1,
444 }
445 }
446
447 #[test]
448 fn put_then_get_returns_in_ts_order() {
449 let s = store();
450 s.put_events("a", "r", "g", "stm", &[ev(3, "c"), ev(1, "a"), ev(2, "b")])
451 .unwrap();
452 let rows = s
453 .get_events("a", "r", "g", "stm", None, None, 0, 100, true)
454 .unwrap();
455 assert_eq!(rows.len(), 3);
456 assert_eq!(rows[0].timestamp, 1);
457 assert_eq!(rows[2].timestamp, 3);
458 }
459
460 #[test]
461 fn time_range_filter() {
462 let s = store();
463 s.put_events("a", "r", "g", "stm", &[ev(1, "a"), ev(5, "b"), ev(10, "c")])
464 .unwrap();
465 let rows = s
466 .get_events("a", "r", "g", "stm", Some(2), Some(7), 0, 100, true)
467 .unwrap();
468 assert_eq!(rows.len(), 1);
469 assert_eq!(rows[0].timestamp, 5);
470 }
471
472 #[test]
473 fn filter_substring_across_streams() {
474 let s = store();
475 s.put_events("a", "r", "g", "s1", &[ev(1, "hello world")])
476 .unwrap();
477 s.put_events(
478 "a",
479 "r",
480 "g",
481 "s2",
482 &[ev(2, "no match"), ev(3, "world cup")],
483 )
484 .unwrap();
485 let rows = s
486 .filter_events("a", "r", "g", None, Some("world"), None, None, 100)
487 .unwrap();
488 assert_eq!(rows.len(), 2);
489 assert_eq!(rows[0].1.timestamp, 1);
490 assert_eq!(rows[1].1.timestamp, 3);
491 }
492
493 #[test]
494 fn trim_older_than_drops_events() {
495 let s = store();
496 s.put_events("a", "r", "g", "stm", &[ev(1, "a"), ev(5, "b"), ev(10, "c")])
497 .unwrap();
498 let removed = s.trim_older_than("a", "r", "g", 5).unwrap();
499 assert_eq!(removed, 1);
500 let remaining = s.count_events("a", "r", "g", "stm", None, None).unwrap();
501 assert_eq!(remaining, 2);
502 }
503
504 #[test]
505 fn stream_bounds_returns_min_max() {
506 let s = store();
507 s.put_events("a", "r", "g", "stm", &[ev(5, "x"), ev(10, "y"), ev(2, "z")])
508 .unwrap();
509 let (min, max) = s.stream_bounds("a", "r", "g", "stm").unwrap();
510 assert_eq!(min, Some(2));
511 assert_eq!(max, Some(10));
512 }
513
514 #[test]
515 fn delete_stream_removes_only_that_stream() {
516 let s = store();
517 s.put_events("a", "r", "g", "s1", &[ev(1, "a")]).unwrap();
518 s.put_events("a", "r", "g", "s2", &[ev(1, "b")]).unwrap();
519 s.delete_stream("a", "r", "g", "s1").unwrap();
520 assert_eq!(s.count_events("a", "r", "g", "s1", None, None).unwrap(), 0);
521 assert_eq!(s.count_events("a", "r", "g", "s2", None, None).unwrap(), 1);
522 }
523}