1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures::stream::BoxStream;
7use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
8
9use crate::runtime_sqlite::{configure_runtime_sqlite, DEFAULT_BUSY_TIMEOUT};
10
11use super::util::{
12 event_id_to_sqlite_i64, now_ms, prepare_event_after, sqlite_i64_to_event_id,
13 sqlite_i64_to_event_id_for_row, sqlite_i64_to_usize, sqlite_json_bytes_for_row,
14 sqlite_size_bytes, stream_from_broadcast, BroadcastMap,
15};
16use super::{
17 AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
18 EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
19};
20
21pub struct SqliteEventLog {
22 path: PathBuf,
23 pub(super) connection: Mutex<Connection>,
24 pub(super) broadcasts: BroadcastMap,
25 pub(super) queue_depth: usize,
26}
27
28impl SqliteEventLog {
29 pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
30 Self::open_inner(path, queue_depth, DEFAULT_BUSY_TIMEOUT)
31 }
32
33 #[cfg(test)]
34 pub(crate) fn open_with_timeout(
35 path: PathBuf,
36 queue_depth: usize,
37 busy_timeout: Duration,
38 ) -> Result<Self, LogError> {
39 Self::open_inner(path, queue_depth, busy_timeout)
40 }
41
42 fn open_inner(
43 path: PathBuf,
44 queue_depth: usize,
45 busy_timeout: Duration,
46 ) -> Result<Self, LogError> {
47 if let Some(parent) = path.parent() {
48 std::fs::create_dir_all(parent)
49 .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
50 }
51 let connection = Connection::open(&path)
52 .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
53 configure_runtime_sqlite(&connection, busy_timeout)
54 .map_err(|error| LogError::Sqlite(format!("event log sqlite setup error: {error}")))?;
55 connection
56 .execute_batch(
57 "CREATE TABLE IF NOT EXISTS topic_heads (
58 topic TEXT PRIMARY KEY,
59 last_id INTEGER NOT NULL
60 );
61 CREATE TABLE IF NOT EXISTS events (
62 topic TEXT NOT NULL,
63 event_id INTEGER NOT NULL,
64 kind TEXT NOT NULL,
65 payload BLOB NOT NULL,
66 headers TEXT NOT NULL,
67 occurred_at_ms INTEGER NOT NULL,
68 PRIMARY KEY (topic, event_id)
69 );
70 CREATE TABLE IF NOT EXISTS consumers (
71 topic TEXT NOT NULL,
72 consumer_id TEXT NOT NULL,
73 cursor INTEGER NOT NULL,
74 updated_at_ms INTEGER NOT NULL,
75 PRIMARY KEY (topic, consumer_id)
76 );
77 CREATE TABLE IF NOT EXISTS event_idempotency_keys (
78 topic TEXT NOT NULL,
79 key TEXT NOT NULL,
80 value TEXT NOT NULL,
81 event_id INTEGER NOT NULL,
82 PRIMARY KEY (topic, key, value),
83 FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
84 );",
85 )
86 .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
87 Ok(Self {
88 path,
89 connection: Mutex::new(connection),
90 broadcasts: BroadcastMap::default(),
91 queue_depth: queue_depth.max(1),
92 })
93 }
94
95 pub fn open_read_only(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
96 let connection = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_ONLY)
97 .map_err(|error| {
98 LogError::Sqlite(format!("event log read-only open error: {error}"))
99 })?;
100 connection
101 .busy_timeout(std::time::Duration::from_secs(5))
102 .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
103 Ok(Self {
104 path,
105 connection: Mutex::new(connection),
106 broadcasts: BroadcastMap::default(),
107 queue_depth: queue_depth.max(1),
108 })
109 }
110
111 pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
112 let connection = self
113 .connection
114 .lock()
115 .expect("sqlite event log connection poisoned");
116 let mut statement = connection
117 .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
118 .map_err(|error| {
119 LogError::Sqlite(format!("event log topics prepare error: {error}"))
120 })?;
121 let rows = statement
122 .query_map([], |row| row.get::<_, String>(0))
123 .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
124 let mut topics = Vec::new();
125 for row in rows {
126 topics.push(Topic::new(row.map_err(|error| {
127 LogError::Sqlite(format!("event log topic row error: {error}"))
128 })?)?);
129 }
130 Ok(topics)
131 }
132
133 pub(super) fn append_idempotent_by_header(
134 &self,
135 topic: &Topic,
136 header: &str,
137 value: &str,
138 event: LogEvent,
139 ) -> Result<AppendOutcome, LogError> {
140 let mut connection = self
141 .connection
142 .lock()
143 .expect("sqlite event log connection poisoned");
144 let tx = connection
145 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
146 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
147
148 let existing = tx
149 .query_row(
150 IDEMPOTENT_LOOKUP_SQL,
151 params![topic.as_str(), header, value],
152 decode_id_event_row,
153 )
154 .optional()
155 .map_err(|error| {
156 LogError::Sqlite(format!("event log idempotency read error: {error}"))
157 })?;
158
159 if let Some((event_id, event)) = existing {
160 return Ok(AppendOutcome {
161 event_id,
162 event,
163 inserted: false,
164 });
165 }
166
167 tx.execute(
168 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
169 params![topic.as_str()],
170 )
171 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
172 tx.execute(
173 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
174 params![topic.as_str()],
175 )
176 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
177 let event_id = tx
178 .query_row(
179 "SELECT last_id FROM topic_heads WHERE topic = ?1",
180 params![topic.as_str()],
181 |row| row.get::<_, i64>(0),
182 )
183 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
184 .and_then(sqlite_i64_to_event_id)?;
185 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
186 let previous = tx
187 .query_row(
188 "SELECT event_id, kind, payload, headers, occurred_at_ms
189 FROM events
190 WHERE topic = ?1 AND event_id < ?2
191 ORDER BY event_id DESC
192 LIMIT 1",
193 params![topic.as_str(), event_id_sql],
194 decode_id_event_row,
195 )
196 .optional()
197 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
198 let event = prepare_event_after(
199 topic,
200 event_id,
201 previous
202 .as_ref()
203 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
204 event,
205 )?;
206 tx.execute(
207 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
208 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
209 params![
210 topic.as_str(),
211 event_id_sql,
212 event.kind,
213 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
214 "event log payload encode error: {error}"
215 )))?,
216 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
217 "event log headers encode error: {error}"
218 )))?,
219 event.occurred_at_ms
220 ],
221 )
222 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
223 tx.execute(
224 "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
225 VALUES (?1, ?2, ?3, ?4)",
226 params![topic.as_str(), header, value, event_id_sql],
227 )
228 .map_err(|error| {
229 LogError::Sqlite(format!("event log idempotency insert error: {error}"))
230 })?;
231 tx.commit()
232 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
233 self.broadcasts
234 .publish(topic, self.queue_depth, (event_id, event.clone()));
235 Ok(AppendOutcome {
236 event_id,
237 event,
238 inserted: true,
239 })
240 }
241
242 pub(super) fn read_idempotent_by_header(
247 &self,
248 topic: &Topic,
249 header: &str,
250 value: &str,
251 ) -> Result<Option<(EventId, LogEvent)>, LogError> {
252 let connection = self
253 .connection
254 .lock()
255 .expect("sqlite event log connection poisoned");
256 connection
257 .query_row(
258 IDEMPOTENT_LOOKUP_SQL,
259 params![topic.as_str(), header, value],
260 decode_id_event_row,
261 )
262 .optional()
263 .map_err(|error| LogError::Sqlite(format!("event log idempotency read error: {error}")))
264 }
265}
266
267const IDEMPOTENT_LOOKUP_SQL: &str =
270 "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
271 FROM event_idempotency_keys k
272 JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
273 WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3";
274
275fn decode_id_event_row(row: &rusqlite::Row) -> rusqlite::Result<(EventId, LogEvent)> {
280 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
281 let headers: String = row.get(3)?;
282 Ok((
283 sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
284 LogEvent {
285 kind: row.get(1)?,
286 payload: serde_json::from_slice(&payload).map_err(|error| {
287 rusqlite::Error::FromSqlConversionFailure(
288 payload.len(),
289 rusqlite::types::Type::Blob,
290 Box::new(error),
291 )
292 })?,
293 headers: serde_json::from_str(&headers).map_err(|error| {
294 rusqlite::Error::FromSqlConversionFailure(
295 headers.len(),
296 rusqlite::types::Type::Text,
297 Box::new(error),
298 )
299 })?,
300 occurred_at_ms: row.get(4)?,
301 },
302 ))
303}
304
305impl EventLog for SqliteEventLog {
306 fn describe(&self) -> EventLogDescription {
307 EventLogDescription {
308 backend: EventLogBackendKind::Sqlite,
309 location: Some(self.path.clone()),
310 size_bytes: Some(sqlite_size_bytes(&self.path)),
311 queue_depth: self.queue_depth,
312 }
313 }
314
315 async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
316 let mut connection = self
317 .connection
318 .lock()
319 .expect("sqlite event log connection poisoned");
320 let tx = connection
321 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
322 .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
323 tx.execute(
324 "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
325 params![topic.as_str()],
326 )
327 .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
328 tx.execute(
329 "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
330 params![topic.as_str()],
331 )
332 .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
333 let event_id = tx
334 .query_row(
335 "SELECT last_id FROM topic_heads WHERE topic = ?1",
336 params![topic.as_str()],
337 |row| row.get::<_, i64>(0),
338 )
339 .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
340 .and_then(sqlite_i64_to_event_id)?;
341 let event_id_sql = event_id_to_sqlite_i64(event_id)?;
342 let previous = tx
343 .query_row(
344 "SELECT event_id, kind, payload, headers, occurred_at_ms
345 FROM events
346 WHERE topic = ?1 AND event_id < ?2
347 ORDER BY event_id DESC
348 LIMIT 1",
349 params![topic.as_str(), event_id_sql],
350 decode_id_event_row,
351 )
352 .optional()
353 .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
354 let event = prepare_event_after(
355 topic,
356 event_id,
357 previous
358 .as_ref()
359 .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
360 event,
361 )?;
362 tx.execute(
363 "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
364 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
365 params![
366 topic.as_str(),
367 event_id_sql,
368 event.kind,
369 serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
370 "event log payload encode error: {error}"
371 )))?,
372 serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
373 "event log headers encode error: {error}"
374 )))?,
375 event.occurred_at_ms
376 ],
377 )
378 .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
379 tx.commit()
380 .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
381 self.broadcasts
382 .publish(topic, self.queue_depth, (event_id, event));
383 Ok(event_id)
384 }
385
386 async fn flush(&self) -> Result<(), LogError> {
387 let connection = self
388 .connection
389 .lock()
390 .expect("sqlite event log connection poisoned");
391 connection
392 .execute_batch("PRAGMA wal_checkpoint(FULL);")
393 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
394 Ok(())
395 }
396
397 async fn read_range(
398 &self,
399 topic: &Topic,
400 from: Option<EventId>,
401 limit: usize,
402 ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
403 let connection = self
404 .connection
405 .lock()
406 .expect("sqlite event log connection poisoned");
407 let mut statement = connection
408 .prepare(
409 "SELECT event_id, kind, payload, headers, occurred_at_ms
410 FROM events
411 WHERE topic = ?1 AND event_id > ?2
412 ORDER BY event_id ASC
413 LIMIT ?3",
414 )
415 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
416 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
417 let rows = statement
418 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
419 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
420 let headers: String = row.get(3)?;
421 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
422 Ok((
423 event_id,
424 LogEvent {
425 kind: row.get(1)?,
426 payload: serde_json::from_slice(&payload).map_err(|error| {
427 rusqlite::Error::FromSqlConversionFailure(
428 payload.len(),
429 rusqlite::types::Type::Blob,
430 Box::new(error),
431 )
432 })?,
433 headers: serde_json::from_str(&headers).map_err(|error| {
434 rusqlite::Error::FromSqlConversionFailure(
435 headers.len(),
436 rusqlite::types::Type::Text,
437 Box::new(error),
438 )
439 })?,
440 occurred_at_ms: row.get(4)?,
441 },
442 ))
443 })
444 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
445 let mut events = Vec::new();
446 for row in rows {
447 events.push(
448 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
449 );
450 }
451 Ok(events)
452 }
453
454 async fn read_range_bytes(
455 &self,
456 topic: &Topic,
457 from: Option<EventId>,
458 limit: usize,
459 ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
460 let connection = self
461 .connection
462 .lock()
463 .expect("sqlite event log connection poisoned");
464 let mut statement = connection
465 .prepare(
466 "SELECT event_id, kind, payload, headers, occurred_at_ms
467 FROM events
468 WHERE topic = ?1 AND event_id > ?2
469 ORDER BY event_id ASC
470 LIMIT ?3",
471 )
472 .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
473 let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
474 let rows = statement
475 .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
476 let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
477 let headers: String = row.get(3)?;
478 let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
479 Ok((
480 event_id,
481 LogEventBytes {
482 kind: row.get(1)?,
483 payload: Bytes::from(payload),
484 headers: serde_json::from_str(&headers).map_err(|error| {
485 rusqlite::Error::FromSqlConversionFailure(
486 headers.len(),
487 rusqlite::types::Type::Text,
488 Box::new(error),
489 )
490 })?,
491 occurred_at_ms: row.get(4)?,
492 },
493 ))
494 })
495 .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
496 let mut events = Vec::new();
497 for row in rows {
498 events.push(
499 row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
500 );
501 }
502 Ok(events)
503 }
504
505 async fn subscribe(
506 self: Arc<Self>,
507 topic: &Topic,
508 from: Option<EventId>,
509 ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
510 let rx = self.broadcasts.subscribe(topic, self.queue_depth);
511 let history = self.read_range(topic, from, usize::MAX).await?;
512 Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
513 }
514
515 async fn ack(
516 &self,
517 topic: &Topic,
518 consumer: &ConsumerId,
519 up_to: EventId,
520 ) -> Result<(), LogError> {
521 let connection = self
522 .connection
523 .lock()
524 .expect("sqlite event log connection poisoned");
525 let up_to_sql = event_id_to_sqlite_i64(up_to)?;
526 connection
527 .execute(
528 "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
529 VALUES (?1, ?2, ?3, ?4)
530 ON CONFLICT(topic, consumer_id)
531 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
532 params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
533 )
534 .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
535 Ok(())
536 }
537
538 async fn consumer_cursor(
539 &self,
540 topic: &Topic,
541 consumer: &ConsumerId,
542 ) -> Result<Option<EventId>, LogError> {
543 let connection = self
544 .connection
545 .lock()
546 .expect("sqlite event log connection poisoned");
547 connection
548 .query_row(
549 "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
550 params![topic.as_str(), consumer.as_str()],
551 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
552 )
553 .optional()
554 .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
555 }
556
557 async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
558 let connection = self
559 .connection
560 .lock()
561 .expect("sqlite event log connection poisoned");
562 connection
563 .query_row(
564 "SELECT last_id FROM topic_heads WHERE topic = ?1",
565 params![topic.as_str()],
566 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
567 )
568 .optional()
569 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
570 }
571
572 async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
573 let connection = self
574 .connection
575 .lock()
576 .expect("sqlite event log connection poisoned");
577 let before_sql = event_id_to_sqlite_i64(before)?;
578 connection
579 .execute(
580 "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
581 params![topic.as_str(), before_sql],
582 )
583 .map_err(|error| {
584 LogError::Sqlite(format!("event log idempotency compact error: {error}"))
585 })?;
586 let removed = connection
587 .execute(
588 "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
589 params![topic.as_str(), before_sql],
590 )
591 .map_err(|error| {
592 LogError::Sqlite(format!("event log compact delete error: {error}"))
593 })?;
594 let remaining = connection
595 .query_row(
596 "SELECT COUNT(*) FROM events WHERE topic = ?1",
597 params![topic.as_str()],
598 |row| row.get::<_, i64>(0),
599 )
600 .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
601 .and_then(sqlite_i64_to_usize)?;
602 let latest = connection
603 .query_row(
604 "SELECT last_id FROM topic_heads WHERE topic = ?1",
605 params![topic.as_str()],
606 |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
607 )
608 .optional()
609 .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
610 connection
611 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
612 .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
613 Ok(CompactReport {
614 removed,
615 remaining,
616 latest,
617 checkpointed: true,
618 })
619 }
620}