Skip to main content

hexeract_outbox_sql/
dialect.rs

1use hexeract_outbox::OutboxError;
2
3use crate::validate::validate_table_name;
4
5/// Canonical PostgreSQL schema for an outbox table.
6///
7/// Byte-for-byte identical to the schema shipped by `hexeract-outbox-postgres`,
8/// so existing deployments need no data migration. `{{table}}` is substituted
9/// by [`Dialect::schema_ddl`].
10const POSTGRES_SCHEMA_SQL: &str = r"
11CREATE TABLE IF NOT EXISTS {{table}} (
12    id            BIGSERIAL    PRIMARY KEY,
13    event_id      UUID         NOT NULL UNIQUE,
14    event_type    VARCHAR(64)  NOT NULL,
15    payload       JSONB        NOT NULL,
16    subject_id    UUID         NULL,
17    created_at    TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
18    attempts      INTEGER      NOT NULL DEFAULT 0,
19    last_error    TEXT         NULL,
20    next_retry_at TIMESTAMPTZ  NULL,
21    delivered_at  TIMESTAMPTZ  NULL
22);
23CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
24    ON {{table}} (created_at)
25    WHERE delivered_at IS NULL;
26CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
27    ON {{table}} (subject_id, id)
28    WHERE subject_id IS NOT NULL;
29";
30
31/// Canonical MySQL schema for an outbox table (requires MySQL 8.0.13+).
32///
33/// MySQL supports neither partial indexes nor `CREATE INDEX IF NOT EXISTS`,
34/// so the indexes are declared inline in the `CREATE TABLE` statement. UUIDs
35/// are stored as `BINARY(16)` and the payload as native `JSON`. Timestamps use
36/// `DATETIME(6)` holding UTC, with an expression default `(UTC_TIMESTAMP(6))`
37/// that requires MySQL 8.0.13 or later.
38const MYSQL_SCHEMA_SQL: &str = r"
39CREATE TABLE IF NOT EXISTS {{table}} (
40    id            BIGINT       NOT NULL AUTO_INCREMENT PRIMARY KEY,
41    event_id      BINARY(16)   NOT NULL UNIQUE,
42    event_type    VARCHAR(64)  NOT NULL,
43    payload       JSON         NOT NULL,
44    subject_id    BINARY(16)   NULL,
45    created_at    DATETIME(6)  NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
46    attempts      INT          NOT NULL DEFAULT 0,
47    last_error    TEXT         NULL,
48    next_retry_at DATETIME(6)  NULL,
49    delivered_at  DATETIME(6)  NULL,
50    INDEX idx_{{table}}_pending (delivered_at, created_at),
51    INDEX idx_{{table}}_subject (subject_id, id)
52);
53";
54
55/// Canonical SQLite schema for an outbox table.
56///
57/// SQLite has dynamic typing, so UUIDs are stored as `BLOB` and the payload
58/// and timestamps as `TEXT`. The `created_at` default is rendered as RFC 3339
59/// (`...T...Z`) so it sorts lexicographically against the bound timestamps.
60const SQLITE_SCHEMA_SQL: &str = r"
61CREATE TABLE IF NOT EXISTS {{table}} (
62    id            INTEGER  PRIMARY KEY AUTOINCREMENT,
63    event_id      BLOB     NOT NULL UNIQUE,
64    event_type    TEXT     NOT NULL,
65    payload       TEXT     NOT NULL,
66    subject_id    BLOB,
67    created_at    TEXT     NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
68    attempts      INTEGER  NOT NULL DEFAULT 0,
69    last_error    TEXT,
70    next_retry_at TEXT,
71    delivered_at  TEXT
72);
73CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
74    ON {{table}} (created_at)
75    WHERE delivered_at IS NULL;
76CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
77    ON {{table}} (subject_id, id)
78    WHERE subject_id IS NOT NULL;
79";
80
81/// SQL dialect differences absorbed by the backend stores.
82///
83/// A [`Dialect`] knows how to render the four statements the outbox needs
84/// (poll, mark-delivered, mark-failed, insert) and the canonical schema DDL
85/// for its engine, accounting for placeholder style, row locking, the
86/// "current instant" expression and per-engine column types.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Dialect {
89    /// PostgreSQL (`sqlx::Postgres`).
90    Postgres,
91    /// MySQL 8.0+ (`sqlx::MySql`).
92    MySql,
93    /// SQLite (`sqlx::Sqlite`).
94    Sqlite,
95}
96
97impl Dialect {
98    /// Whether competing-consumers row skip-locking is available.
99    ///
100    /// `true` for PostgreSQL and MySQL 8.0+, `false` for SQLite (which
101    /// serializes writes through a single writer instead).
102    #[must_use]
103    pub fn supports_skip_locked(self) -> bool {
104        matches!(self, Self::Postgres | Self::MySql)
105    }
106
107    /// Render the bind placeholder for the 1-based parameter `index`.
108    ///
109    /// PostgreSQL uses positional `$1`, `$2`; MySQL and SQLite use `?`.
110    pub(crate) fn placeholder(self, index: usize) -> String {
111        match self {
112            Self::Postgres => format!("${index}"),
113            Self::MySql | Self::Sqlite => "?".to_owned(),
114        }
115    }
116
117    /// SQL expression evaluating to the current instant, in a form
118    /// comparable to the stored timestamps.
119    pub(crate) fn now_expr(self) -> &'static str {
120        match self {
121            Self::Postgres => "NOW()",
122            // UTC_TIMESTAMP() is independent of the server session time zone,
123            // keeping comparisons consistent with the UTC values the MySQL
124            // store binds into DATETIME columns.
125            Self::MySql => "UTC_TIMESTAMP()",
126            Self::Sqlite => "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
127        }
128    }
129
130    /// `SELECT ... WHERE delivered_at IS NULL ... [FOR UPDATE SKIP LOCKED]`.
131    pub(crate) fn poll_sql(self, table: &str) -> String {
132        let max_attempts = self.placeholder(1);
133        let limit = self.placeholder(2);
134        let now = self.now_expr();
135        let lock = if self.supports_skip_locked() {
136            " FOR UPDATE SKIP LOCKED"
137        } else {
138            ""
139        };
140        format!(
141            "SELECT event_id, event_type, payload, subject_id, created_at, \
142                    attempts, last_error, next_retry_at \
143             FROM {table} \
144             WHERE delivered_at IS NULL \
145               AND attempts < {max_attempts} \
146               AND (next_retry_at IS NULL OR next_retry_at <= {now}) \
147             ORDER BY id \
148             LIMIT {limit}{lock}"
149        )
150    }
151
152    /// `UPDATE {table} SET delivered_at = {now} WHERE event_id = {ph}`.
153    pub(crate) fn mark_delivered_sql(self, table: &str) -> String {
154        let event_id = self.placeholder(1);
155        let now = self.now_expr();
156        format!("UPDATE {table} SET delivered_at = {now} WHERE event_id = {event_id}")
157    }
158
159    /// `UPDATE {table} SET attempts = attempts + 1, last_error, next_retry_at ...`.
160    pub(crate) fn mark_failed_sql(self, table: &str) -> String {
161        let last_error = self.placeholder(1);
162        let next_retry_at = self.placeholder(2);
163        let event_id = self.placeholder(3);
164        format!(
165            "UPDATE {table} \
166             SET attempts = attempts + 1, last_error = {last_error}, next_retry_at = {next_retry_at} \
167             WHERE event_id = {event_id}"
168        )
169    }
170
171    /// `INSERT INTO {table} (event_id, event_type, payload, subject_id) VALUES (...)`.
172    pub(crate) fn insert_sql(self, table: &str) -> String {
173        let p1 = self.placeholder(1);
174        let p2 = self.placeholder(2);
175        let p3 = self.placeholder(3);
176        let p4 = self.placeholder(4);
177        format!(
178            "INSERT INTO {table} (event_id, event_type, payload, subject_id) \
179             VALUES ({p1}, {p2}, {p3}, {p4})"
180        )
181    }
182
183    /// Canonical schema DDL (table + indexes) rendered for this dialect.
184    ///
185    /// # Errors
186    ///
187    /// Returns [`OutboxError::Internal`] if `table` is not a valid
188    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$`.
189    pub fn schema_ddl(self, table: &str) -> Result<String, OutboxError> {
190        validate_table_name(table)?;
191        let template = match self {
192            Self::Postgres => POSTGRES_SCHEMA_SQL,
193            Self::MySql => MYSQL_SCHEMA_SQL,
194            Self::Sqlite => SQLITE_SCHEMA_SQL,
195        };
196        Ok(template.replace("{{table}}", table))
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[test]
205    fn skip_locked_support_matches_engine_capabilities() {
206        assert!(Dialect::Postgres.supports_skip_locked());
207        assert!(Dialect::MySql.supports_skip_locked());
208        assert!(!Dialect::Sqlite.supports_skip_locked());
209    }
210
211    #[test]
212    fn postgres_uses_positional_placeholders() {
213        assert_eq!(Dialect::Postgres.placeholder(1), "$1");
214        assert_eq!(Dialect::Postgres.placeholder(4), "$4");
215    }
216
217    #[test]
218    fn mysql_and_sqlite_use_question_mark_placeholders() {
219        assert_eq!(Dialect::MySql.placeholder(1), "?");
220        assert_eq!(Dialect::Sqlite.placeholder(3), "?");
221    }
222
223    #[test]
224    fn postgres_poll_sql_locks_rows_and_binds_positionally() {
225        let sql = Dialect::Postgres.poll_sql("audit_outbox");
226        assert!(sql.contains("FROM audit_outbox"));
227        assert!(sql.contains("$1"));
228        assert!(sql.contains("$2"));
229        assert!(sql.contains("ORDER BY id"));
230        assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
231        assert!(sql.contains("NOW()"));
232    }
233
234    #[test]
235    fn mysql_poll_sql_locks_rows_with_question_marks() {
236        let sql = Dialect::MySql.poll_sql("audit_outbox");
237        assert!(sql.contains("FROM audit_outbox"));
238        assert!(sql.contains('?'));
239        assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
240    }
241
242    #[test]
243    fn sqlite_poll_sql_omits_skip_locked() {
244        let sql = Dialect::Sqlite.poll_sql("audit_outbox");
245        assert!(sql.contains("FROM audit_outbox"));
246        assert!(sql.contains('?'));
247        assert!(!sql.contains("FOR UPDATE SKIP LOCKED"));
248        assert!(sql.contains("strftime"));
249    }
250
251    #[test]
252    fn postgres_mark_delivered_sets_timestamp_by_event_id() {
253        let sql = Dialect::Postgres.mark_delivered_sql("audit_outbox");
254        assert!(sql.contains("UPDATE audit_outbox"));
255        assert!(sql.contains("delivered_at"));
256        assert!(sql.contains("$1"));
257    }
258
259    #[test]
260    fn postgres_mark_failed_increments_attempts_with_three_binds() {
261        let sql = Dialect::Postgres.mark_failed_sql("audit_outbox");
262        assert!(sql.contains("attempts = attempts + 1"));
263        assert!(sql.contains("$1"));
264        assert!(sql.contains("$2"));
265        assert!(sql.contains("$3"));
266    }
267
268    #[test]
269    fn postgres_insert_sql_binds_four_columns() {
270        let sql = Dialect::Postgres.insert_sql("audit_outbox");
271        assert!(sql.contains("INSERT INTO audit_outbox"));
272        assert!(sql.contains("event_id, event_type, payload, subject_id"));
273        assert!(sql.contains("$1, $2, $3, $4"));
274    }
275
276    #[test]
277    fn sqlite_insert_sql_uses_question_marks() {
278        let sql = Dialect::Sqlite.insert_sql("audit_outbox");
279        assert!(sql.contains("INSERT INTO audit_outbox"));
280        assert!(sql.contains("?, ?, ?, ?"));
281    }
282
283    #[test]
284    fn postgres_schema_ddl_matches_current_canonical_schema() {
285        let ddl = Dialect::Postgres.schema_ddl("audit_outbox").unwrap();
286        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
287        assert!(ddl.contains("BIGSERIAL"));
288        assert!(ddl.contains("UUID"));
289        assert!(ddl.contains("JSONB"));
290        assert!(ddl.contains("TIMESTAMPTZ"));
291        assert!(ddl.contains("idx_audit_outbox_pending"));
292        assert!(ddl.contains("idx_audit_outbox_subject"));
293        assert!(!ddl.contains("{{table}}"));
294    }
295
296    #[test]
297    fn mysql_schema_ddl_uses_native_types() {
298        let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
299        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
300        assert!(ddl.contains("AUTO_INCREMENT"));
301        assert!(ddl.contains("BINARY(16)"));
302        assert!(ddl.contains("JSON"));
303        assert!(!ddl.contains("{{table}}"));
304    }
305
306    #[test]
307    fn sqlite_schema_ddl_uses_portable_text_types() {
308        let ddl = Dialect::Sqlite.schema_ddl("audit_outbox").unwrap();
309        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
310        assert!(ddl.contains("AUTOINCREMENT"));
311        assert!(ddl.contains("BLOB"));
312        assert!(ddl.contains("strftime"));
313        assert!(!ddl.contains("{{table}}"));
314    }
315
316    #[test]
317    fn schema_ddl_rejects_invalid_table_name() {
318        let err = Dialect::Postgres.schema_ddl("bad name; DROP").unwrap_err();
319        assert!(matches!(err, OutboxError::Internal(_)));
320    }
321
322    #[test]
323    fn mysql_compares_against_utc_not_session_time() {
324        let sql = Dialect::MySql.poll_sql("audit_outbox");
325        assert!(sql.contains("UTC_TIMESTAMP()"));
326    }
327
328    #[test]
329    fn mysql_schema_ddl_defaults_created_at_to_utc() {
330        let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
331        assert!(ddl.contains("UTC_TIMESTAMP(6)"));
332    }
333}