Skip to main content

harn_vm/event_log/
sqlite.rs

1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures::stream::BoxStream;
7use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
8
9use crate::runtime_sqlite::{configure_runtime_sqlite, DEFAULT_BUSY_TIMEOUT};
10
11use super::util::{
12    event_id_to_sqlite_i64, now_ms, prepare_event_after, sqlite_i64_to_event_id,
13    sqlite_i64_to_event_id_for_row, sqlite_i64_to_usize, sqlite_json_bytes_for_row,
14    sqlite_size_bytes, stream_from_broadcast, BroadcastMap,
15};
16use super::{
17    AppendOutcome, CompactReport, ConsumerId, EventId, EventLog, EventLogBackendKind,
18    EventLogDescription, LogError, LogEvent, LogEventBytes, Topic,
19};
20
21pub struct SqliteEventLog {
22    path: PathBuf,
23    pub(super) connection: Mutex<Connection>,
24    pub(super) broadcasts: BroadcastMap,
25    pub(super) queue_depth: usize,
26}
27
28impl SqliteEventLog {
29    pub fn open(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
30        Self::open_inner(path, queue_depth, DEFAULT_BUSY_TIMEOUT)
31    }
32
33    #[cfg(test)]
34    pub(crate) fn open_with_timeout(
35        path: PathBuf,
36        queue_depth: usize,
37        busy_timeout: Duration,
38    ) -> Result<Self, LogError> {
39        Self::open_inner(path, queue_depth, busy_timeout)
40    }
41
42    fn open_inner(
43        path: PathBuf,
44        queue_depth: usize,
45        busy_timeout: Duration,
46    ) -> Result<Self, LogError> {
47        if let Some(parent) = path.parent() {
48            std::fs::create_dir_all(parent)
49                .map_err(|error| LogError::Io(format!("event log mkdir error: {error}")))?;
50        }
51        let connection = Connection::open(&path)
52            .map_err(|error| LogError::Sqlite(format!("event log open error: {error}")))?;
53        configure_runtime_sqlite(&connection, busy_timeout)
54            .map_err(|error| LogError::Sqlite(format!("event log sqlite setup error: {error}")))?;
55        connection
56            .execute_batch(
57                "CREATE TABLE IF NOT EXISTS topic_heads (
58                    topic TEXT PRIMARY KEY,
59                    last_id INTEGER NOT NULL
60                );
61                CREATE TABLE IF NOT EXISTS events (
62                    topic TEXT NOT NULL,
63                    event_id INTEGER NOT NULL,
64                    kind TEXT NOT NULL,
65                    payload BLOB NOT NULL,
66                    headers TEXT NOT NULL,
67                    occurred_at_ms INTEGER NOT NULL,
68                    PRIMARY KEY (topic, event_id)
69                );
70                CREATE TABLE IF NOT EXISTS consumers (
71                    topic TEXT NOT NULL,
72                    consumer_id TEXT NOT NULL,
73                    cursor INTEGER NOT NULL,
74                    updated_at_ms INTEGER NOT NULL,
75                    PRIMARY KEY (topic, consumer_id)
76                );
77                CREATE TABLE IF NOT EXISTS event_idempotency_keys (
78                    topic TEXT NOT NULL,
79                    key TEXT NOT NULL,
80                    value TEXT NOT NULL,
81                    event_id INTEGER NOT NULL,
82                    PRIMARY KEY (topic, key, value),
83                    FOREIGN KEY (topic, event_id) REFERENCES events(topic, event_id)
84                );",
85            )
86            .map_err(|error| LogError::Sqlite(format!("event log schema error: {error}")))?;
87        Ok(Self {
88            path,
89            connection: Mutex::new(connection),
90            broadcasts: BroadcastMap::default(),
91            queue_depth: queue_depth.max(1),
92        })
93    }
94
95    pub fn open_read_only(path: PathBuf, queue_depth: usize) -> Result<Self, LogError> {
96        let connection = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_ONLY)
97            .map_err(|error| {
98                LogError::Sqlite(format!("event log read-only open error: {error}"))
99            })?;
100        connection
101            .busy_timeout(std::time::Duration::from_secs(5))
102            .map_err(|error| LogError::Sqlite(format!("event log busy-timeout error: {error}")))?;
103        Ok(Self {
104            path,
105            connection: Mutex::new(connection),
106            broadcasts: BroadcastMap::default(),
107            queue_depth: queue_depth.max(1),
108        })
109    }
110
111    pub(super) fn topics(&self) -> Result<Vec<Topic>, LogError> {
112        let connection = self
113            .connection
114            .lock()
115            .expect("sqlite event log connection poisoned");
116        let mut statement = connection
117            .prepare("SELECT DISTINCT topic FROM events ORDER BY topic ASC")
118            .map_err(|error| {
119                LogError::Sqlite(format!("event log topics prepare error: {error}"))
120            })?;
121        let rows = statement
122            .query_map([], |row| row.get::<_, String>(0))
123            .map_err(|error| LogError::Sqlite(format!("event log topics query error: {error}")))?;
124        let mut topics = Vec::new();
125        for row in rows {
126            topics.push(Topic::new(row.map_err(|error| {
127                LogError::Sqlite(format!("event log topic row error: {error}"))
128            })?)?);
129        }
130        Ok(topics)
131    }
132
133    pub(super) fn append_idempotent_by_header(
134        &self,
135        topic: &Topic,
136        header: &str,
137        value: &str,
138        event: LogEvent,
139    ) -> Result<AppendOutcome, LogError> {
140        let mut connection = self
141            .connection
142            .lock()
143            .expect("sqlite event log connection poisoned");
144        let tx = connection
145            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
146            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
147
148        let existing = tx
149            .query_row(
150                IDEMPOTENT_LOOKUP_SQL,
151                params![topic.as_str(), header, value],
152                decode_id_event_row,
153            )
154            .optional()
155            .map_err(|error| {
156                LogError::Sqlite(format!("event log idempotency read error: {error}"))
157            })?;
158
159        if let Some((event_id, event)) = existing {
160            return Ok(AppendOutcome {
161                event_id,
162                event,
163                inserted: false,
164            });
165        }
166
167        tx.execute(
168            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
169            params![topic.as_str()],
170        )
171        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
172        tx.execute(
173            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
174            params![topic.as_str()],
175        )
176        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
177        let event_id = tx
178            .query_row(
179                "SELECT last_id FROM topic_heads WHERE topic = ?1",
180                params![topic.as_str()],
181                |row| row.get::<_, i64>(0),
182            )
183            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
184            .and_then(sqlite_i64_to_event_id)?;
185        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
186        let previous = tx
187            .query_row(
188                "SELECT event_id, kind, payload, headers, occurred_at_ms
189                 FROM events
190                 WHERE topic = ?1 AND event_id < ?2
191                 ORDER BY event_id DESC
192                 LIMIT 1",
193                params![topic.as_str(), event_id_sql],
194                decode_id_event_row,
195            )
196            .optional()
197            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
198        let event = prepare_event_after(
199            topic,
200            event_id,
201            previous
202                .as_ref()
203                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
204            event,
205        )?;
206        tx.execute(
207            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
208             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
209            params![
210                topic.as_str(),
211                event_id_sql,
212                event.kind,
213                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
214                    "event log payload encode error: {error}"
215                )))?,
216                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
217                    "event log headers encode error: {error}"
218                )))?,
219                event.occurred_at_ms
220            ],
221        )
222        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
223        tx.execute(
224            "INSERT INTO event_idempotency_keys(topic, key, value, event_id)
225             VALUES (?1, ?2, ?3, ?4)",
226            params![topic.as_str(), header, value, event_id_sql],
227        )
228        .map_err(|error| {
229            LogError::Sqlite(format!("event log idempotency insert error: {error}"))
230        })?;
231        tx.commit()
232            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
233        self.broadcasts
234            .publish(topic, self.queue_depth, (event_id, event.clone()));
235        Ok(AppendOutcome {
236            event_id,
237            event,
238            inserted: true,
239        })
240    }
241
242    /// Look up the event previously appended under `(header, value)` via the
243    /// indexed `event_idempotency_keys` JOIN — the read counterpart of
244    /// [`Self::append_idempotent_by_header`]. O(log N) on the index instead of
245    /// scanning the topic, which is what makes durable-step replay cheap.
246    pub(super) fn read_idempotent_by_header(
247        &self,
248        topic: &Topic,
249        header: &str,
250        value: &str,
251    ) -> Result<Option<(EventId, LogEvent)>, LogError> {
252        let connection = self
253            .connection
254            .lock()
255            .expect("sqlite event log connection poisoned");
256        connection
257            .query_row(
258                IDEMPOTENT_LOOKUP_SQL,
259                params![topic.as_str(), header, value],
260                decode_id_event_row,
261            )
262            .optional()
263            .map_err(|error| LogError::Sqlite(format!("event log idempotency read error: {error}")))
264    }
265}
266
267/// The `(topic, header, value) -> event` JOIN against the idempotency index,
268/// shared by the append-time existence check and the read counterpart.
269const IDEMPOTENT_LOOKUP_SQL: &str =
270    "SELECT e.event_id, e.kind, e.payload, e.headers, e.occurred_at_ms
271     FROM event_idempotency_keys k
272     JOIN events e ON e.topic = k.topic AND e.event_id = k.event_id
273     WHERE k.topic = ?1 AND k.key = ?2 AND k.value = ?3";
274
275/// Decode a `(event_id, kind, payload, headers, occurred_at_ms)` row (in that
276/// column order) into an `(EventId, LogEvent)`. Shared by the idempotency
277/// lookup, the previous-event read, and `read_idempotent_by_header` so the
278/// row-shape decode lives in exactly one place.
279fn decode_id_event_row(row: &rusqlite::Row) -> rusqlite::Result<(EventId, LogEvent)> {
280    let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
281    let headers: String = row.get(3)?;
282    Ok((
283        sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?,
284        LogEvent {
285            kind: row.get(1)?,
286            payload: serde_json::from_slice(&payload).map_err(|error| {
287                rusqlite::Error::FromSqlConversionFailure(
288                    payload.len(),
289                    rusqlite::types::Type::Blob,
290                    Box::new(error),
291                )
292            })?,
293            headers: serde_json::from_str(&headers).map_err(|error| {
294                rusqlite::Error::FromSqlConversionFailure(
295                    headers.len(),
296                    rusqlite::types::Type::Text,
297                    Box::new(error),
298                )
299            })?,
300            occurred_at_ms: row.get(4)?,
301        },
302    ))
303}
304
305impl EventLog for SqliteEventLog {
306    fn describe(&self) -> EventLogDescription {
307        EventLogDescription {
308            backend: EventLogBackendKind::Sqlite,
309            location: Some(self.path.clone()),
310            size_bytes: Some(sqlite_size_bytes(&self.path)),
311            queue_depth: self.queue_depth,
312        }
313    }
314
315    async fn append(&self, topic: &Topic, event: LogEvent) -> Result<EventId, LogError> {
316        let mut connection = self
317            .connection
318            .lock()
319            .expect("sqlite event log connection poisoned");
320        let tx = connection
321            .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
322            .map_err(|error| LogError::Sqlite(format!("event log transaction error: {error}")))?;
323        tx.execute(
324            "INSERT OR IGNORE INTO topic_heads(topic, last_id) VALUES (?1, 0)",
325            params![topic.as_str()],
326        )
327        .map_err(|error| LogError::Sqlite(format!("event log head init error: {error}")))?;
328        tx.execute(
329            "UPDATE topic_heads SET last_id = last_id + 1 WHERE topic = ?1",
330            params![topic.as_str()],
331        )
332        .map_err(|error| LogError::Sqlite(format!("event log head update error: {error}")))?;
333        let event_id = tx
334            .query_row(
335                "SELECT last_id FROM topic_heads WHERE topic = ?1",
336                params![topic.as_str()],
337                |row| row.get::<_, i64>(0),
338            )
339            .map_err(|error| LogError::Sqlite(format!("event log head read error: {error}")))
340            .and_then(sqlite_i64_to_event_id)?;
341        let event_id_sql = event_id_to_sqlite_i64(event_id)?;
342        let previous = tx
343            .query_row(
344                "SELECT event_id, kind, payload, headers, occurred_at_ms
345                 FROM events
346                 WHERE topic = ?1 AND event_id < ?2
347                 ORDER BY event_id DESC
348                 LIMIT 1",
349                params![topic.as_str(), event_id_sql],
350                decode_id_event_row,
351            )
352            .optional()
353            .map_err(|error| LogError::Sqlite(format!("event log previous read error: {error}")))?;
354        let event = prepare_event_after(
355            topic,
356            event_id,
357            previous
358                .as_ref()
359                .map(|(previous_id, previous_event)| (*previous_id, previous_event)),
360            event,
361        )?;
362        tx.execute(
363            "INSERT INTO events(topic, event_id, kind, payload, headers, occurred_at_ms)
364             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
365            params![
366                topic.as_str(),
367                event_id_sql,
368                event.kind,
369                serde_json::to_vec(&event.payload).map_err(|error| LogError::Serde(format!(
370                    "event log payload encode error: {error}"
371                )))?,
372                serde_json::to_string(&event.headers).map_err(|error| LogError::Serde(format!(
373                    "event log headers encode error: {error}"
374                )))?,
375                event.occurred_at_ms
376            ],
377        )
378        .map_err(|error| LogError::Sqlite(format!("event log insert error: {error}")))?;
379        tx.commit()
380            .map_err(|error| LogError::Sqlite(format!("event log commit error: {error}")))?;
381        self.broadcasts
382            .publish(topic, self.queue_depth, (event_id, event));
383        Ok(event_id)
384    }
385
386    async fn flush(&self) -> Result<(), LogError> {
387        let connection = self
388            .connection
389            .lock()
390            .expect("sqlite event log connection poisoned");
391        connection
392            .execute_batch("PRAGMA wal_checkpoint(FULL);")
393            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
394        Ok(())
395    }
396
397    async fn read_range(
398        &self,
399        topic: &Topic,
400        from: Option<EventId>,
401        limit: usize,
402    ) -> Result<Vec<(EventId, LogEvent)>, LogError> {
403        let connection = self
404            .connection
405            .lock()
406            .expect("sqlite event log connection poisoned");
407        let mut statement = connection
408            .prepare(
409                "SELECT event_id, kind, payload, headers, occurred_at_ms
410                 FROM events
411                 WHERE topic = ?1 AND event_id > ?2
412                 ORDER BY event_id ASC
413                 LIMIT ?3",
414            )
415            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
416        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
417        let rows = statement
418            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
419                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
420                let headers: String = row.get(3)?;
421                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
422                Ok((
423                    event_id,
424                    LogEvent {
425                        kind: row.get(1)?,
426                        payload: serde_json::from_slice(&payload).map_err(|error| {
427                            rusqlite::Error::FromSqlConversionFailure(
428                                payload.len(),
429                                rusqlite::types::Type::Blob,
430                                Box::new(error),
431                            )
432                        })?,
433                        headers: serde_json::from_str(&headers).map_err(|error| {
434                            rusqlite::Error::FromSqlConversionFailure(
435                                headers.len(),
436                                rusqlite::types::Type::Text,
437                                Box::new(error),
438                            )
439                        })?,
440                        occurred_at_ms: row.get(4)?,
441                    },
442                ))
443            })
444            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
445        let mut events = Vec::new();
446        for row in rows {
447            events.push(
448                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
449            );
450        }
451        Ok(events)
452    }
453
454    async fn read_range_bytes(
455        &self,
456        topic: &Topic,
457        from: Option<EventId>,
458        limit: usize,
459    ) -> Result<Vec<(EventId, LogEventBytes)>, LogError> {
460        let connection = self
461            .connection
462            .lock()
463            .expect("sqlite event log connection poisoned");
464        let mut statement = connection
465            .prepare(
466                "SELECT event_id, kind, payload, headers, occurred_at_ms
467                 FROM events
468                 WHERE topic = ?1 AND event_id > ?2
469                 ORDER BY event_id ASC
470                 LIMIT ?3",
471            )
472            .map_err(|error| LogError::Sqlite(format!("event log prepare error: {error}")))?;
473        let from_sql = event_id_to_sqlite_i64(from.unwrap_or(0))?;
474        let rows = statement
475            .query_map(params![topic.as_str(), from_sql, limit as i64], |row| {
476                let payload = sqlite_json_bytes_for_row(row, 2, "payload")?;
477                let headers: String = row.get(3)?;
478                let event_id = sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?)?;
479                Ok((
480                    event_id,
481                    LogEventBytes {
482                        kind: row.get(1)?,
483                        payload: Bytes::from(payload),
484                        headers: serde_json::from_str(&headers).map_err(|error| {
485                            rusqlite::Error::FromSqlConversionFailure(
486                                headers.len(),
487                                rusqlite::types::Type::Text,
488                                Box::new(error),
489                            )
490                        })?,
491                        occurred_at_ms: row.get(4)?,
492                    },
493                ))
494            })
495            .map_err(|error| LogError::Sqlite(format!("event log query error: {error}")))?;
496        let mut events = Vec::new();
497        for row in rows {
498            events.push(
499                row.map_err(|error| LogError::Sqlite(format!("event log row error: {error}")))?,
500            );
501        }
502        Ok(events)
503    }
504
505    async fn subscribe(
506        self: Arc<Self>,
507        topic: &Topic,
508        from: Option<EventId>,
509    ) -> Result<BoxStream<'static, Result<(EventId, LogEvent), LogError>>, LogError> {
510        let rx = self.broadcasts.subscribe(topic, self.queue_depth);
511        let history = self.read_range(topic, from, usize::MAX).await?;
512        Ok(stream_from_broadcast(history, from, rx, self.queue_depth))
513    }
514
515    async fn ack(
516        &self,
517        topic: &Topic,
518        consumer: &ConsumerId,
519        up_to: EventId,
520    ) -> Result<(), LogError> {
521        let connection = self
522            .connection
523            .lock()
524            .expect("sqlite event log connection poisoned");
525        let up_to_sql = event_id_to_sqlite_i64(up_to)?;
526        connection
527            .execute(
528                "INSERT INTO consumers(topic, consumer_id, cursor, updated_at_ms)
529                 VALUES (?1, ?2, ?3, ?4)
530                 ON CONFLICT(topic, consumer_id)
531                 DO UPDATE SET cursor = excluded.cursor, updated_at_ms = excluded.updated_at_ms",
532                params![topic.as_str(), consumer.as_str(), up_to_sql, now_ms()],
533            )
534            .map_err(|error| LogError::Sqlite(format!("event log ack error: {error}")))?;
535        Ok(())
536    }
537
538    async fn consumer_cursor(
539        &self,
540        topic: &Topic,
541        consumer: &ConsumerId,
542    ) -> Result<Option<EventId>, LogError> {
543        let connection = self
544            .connection
545            .lock()
546            .expect("sqlite event log connection poisoned");
547        connection
548            .query_row(
549                "SELECT cursor FROM consumers WHERE topic = ?1 AND consumer_id = ?2",
550                params![topic.as_str(), consumer.as_str()],
551                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
552            )
553            .optional()
554            .map_err(|error| LogError::Sqlite(format!("event log consumer cursor error: {error}")))
555    }
556
557    async fn latest(&self, topic: &Topic) -> Result<Option<EventId>, LogError> {
558        let connection = self
559            .connection
560            .lock()
561            .expect("sqlite event log connection poisoned");
562        connection
563            .query_row(
564                "SELECT last_id FROM topic_heads WHERE topic = ?1",
565                params![topic.as_str()],
566                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
567            )
568            .optional()
569            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))
570    }
571
572    async fn compact(&self, topic: &Topic, before: EventId) -> Result<CompactReport, LogError> {
573        let connection = self
574            .connection
575            .lock()
576            .expect("sqlite event log connection poisoned");
577        let before_sql = event_id_to_sqlite_i64(before)?;
578        connection
579            .execute(
580                "DELETE FROM event_idempotency_keys WHERE topic = ?1 AND event_id <= ?2",
581                params![topic.as_str(), before_sql],
582            )
583            .map_err(|error| {
584                LogError::Sqlite(format!("event log idempotency compact error: {error}"))
585            })?;
586        let removed = connection
587            .execute(
588                "DELETE FROM events WHERE topic = ?1 AND event_id <= ?2",
589                params![topic.as_str(), before_sql],
590            )
591            .map_err(|error| {
592                LogError::Sqlite(format!("event log compact delete error: {error}"))
593            })?;
594        let remaining = connection
595            .query_row(
596                "SELECT COUNT(*) FROM events WHERE topic = ?1",
597                params![topic.as_str()],
598                |row| row.get::<_, i64>(0),
599            )
600            .map_err(|error| LogError::Sqlite(format!("event log compact count error: {error}")))
601            .and_then(sqlite_i64_to_usize)?;
602        let latest = connection
603            .query_row(
604                "SELECT last_id FROM topic_heads WHERE topic = ?1",
605                params![topic.as_str()],
606                |row| sqlite_i64_to_event_id_for_row(row.get::<_, i64>(0)?),
607            )
608            .optional()
609            .map_err(|error| LogError::Sqlite(format!("event log latest error: {error}")))?;
610        connection
611            .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")
612            .map_err(|error| LogError::Sqlite(format!("event log checkpoint error: {error}")))?;
613        Ok(CompactReport {
614            removed,
615            remaining,
616            latest,
617            checkpointed: true,
618        })
619    }
620}