Skip to main content

hexeract_outbox_sql/
dialect.rs

1use hexeract_outbox::OutboxError;
2
3use crate::validate::validate_table_name;
4
5/// Canonical PostgreSQL schema for an outbox table.
6///
7/// `{{table}}` is substituted
8/// by [`Dialect::schema_ddl`].
9const POSTGRES_SCHEMA_SQL: &str = r"
10CREATE TABLE IF NOT EXISTS {{table}} (
11    id            BIGSERIAL    PRIMARY KEY,
12    event_id      UUID         NOT NULL UNIQUE,
13    event_type    VARCHAR(64)  NOT NULL,
14    payload       JSONB        NOT NULL,
15    subject_id    UUID         NULL,
16    created_at    TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
17    attempts      INTEGER      NOT NULL DEFAULT 0,
18    last_error    TEXT         NULL,
19    next_retry_at TIMESTAMPTZ  NULL,
20    delivered_at  TIMESTAMPTZ  NULL
21);
22CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
23    ON {{table}} (created_at)
24    WHERE delivered_at IS NULL;
25CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
26    ON {{table}} (subject_id, id)
27    WHERE subject_id IS NOT NULL;
28";
29
30/// Canonical MySQL schema for an outbox table (requires MySQL 8.0.13+).
31///
32/// MySQL supports neither partial indexes nor `CREATE INDEX IF NOT EXISTS`,
33/// so the indexes are declared inline in the `CREATE TABLE` statement. UUIDs
34/// are stored as `BINARY(16)` and the payload as native `JSON`. Timestamps use
35/// `DATETIME(6)` holding UTC, with an expression default `(UTC_TIMESTAMP(6))`
36/// that requires MySQL 8.0.13 or later.
37const MYSQL_SCHEMA_SQL: &str = r"
38CREATE TABLE IF NOT EXISTS {{table}} (
39    id            BIGINT       NOT NULL AUTO_INCREMENT PRIMARY KEY,
40    event_id      BINARY(16)   NOT NULL UNIQUE,
41    event_type    VARCHAR(64)  NOT NULL,
42    payload       JSON         NOT NULL,
43    subject_id    BINARY(16)   NULL,
44    created_at    DATETIME(6)  NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
45    attempts      INT          NOT NULL DEFAULT 0,
46    last_error    TEXT         NULL,
47    next_retry_at DATETIME(6)  NULL,
48    delivered_at  DATETIME(6)  NULL,
49    INDEX idx_{{table}}_pending (delivered_at, created_at),
50    INDEX idx_{{table}}_subject (subject_id, id)
51);
52";
53
54/// Canonical PostgreSQL dead-letter schema.
55///
56/// Rows are moved here when `attempts >= max_attempts`. `exhausted_at`
57/// defaults to `NOW()` and records when the envelope was declared poison.
58/// `{{table}}` is substituted by [`Dialect::dead_letter_schema_ddl`].
59///
60/// Note: `event_id` is declared `UNIQUE`, which already creates an implicit
61/// B-tree index. A separate `idx_{{table}}_dead_letter_event_id` index would
62/// be a duplicate and is therefore omitted.
63const POSTGRES_DLQ_SCHEMA_SQL: &str = r"
64CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
65    id            BIGSERIAL    PRIMARY KEY,
66    event_id      UUID         NOT NULL UNIQUE,
67    event_type    VARCHAR(64)  NOT NULL,
68    payload       JSONB        NOT NULL,
69    subject_id    UUID         NULL,
70    created_at    TIMESTAMPTZ  NOT NULL,
71    attempts      INTEGER      NOT NULL,
72    last_error    TEXT         NOT NULL,
73    exhausted_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW()
74);
75CREATE INDEX IF NOT EXISTS idx_{{table}}_dead_letter_exhausted_at
76    ON {{table}}_dead_letter (exhausted_at);
77";
78
79/// Canonical MySQL dead-letter schema (requires MySQL 8.0.13+).
80///
81/// Mirrors the MySQL outbox schema: UUIDs as `BINARY(16)`, payload as
82/// `JSON`, timestamps as `DATETIME(6)` UTC. `{{table}}` is substituted
83/// by [`Dialect::dead_letter_schema_ddl`].
84///
85/// Note: `event_id` is declared `UNIQUE`, which already creates an implicit
86/// index. A separate `idx_{{table}}_dead_letter_event_id` index is omitted.
87const MYSQL_DLQ_SCHEMA_SQL: &str = r"
88CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
89    id            BIGINT       NOT NULL AUTO_INCREMENT PRIMARY KEY,
90    event_id      BINARY(16)   NOT NULL UNIQUE,
91    event_type    VARCHAR(64)  NOT NULL,
92    payload       JSON         NOT NULL,
93    subject_id    BINARY(16)   NULL,
94    created_at    DATETIME(6)  NOT NULL,
95    attempts      INT          NOT NULL,
96    last_error    TEXT         NOT NULL,
97    exhausted_at  DATETIME(6)  NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
98    INDEX idx_{{table}}_dead_letter_exhausted_at (exhausted_at)
99);
100";
101
102/// Canonical SQLite dead-letter schema.
103///
104/// Mirrors the SQLite outbox schema: UUIDs as `BLOB`, timestamps as
105/// `TEXT` in RFC 3339 form. `{{table}}` is substituted by
106/// [`Dialect::dead_letter_schema_ddl`].
107///
108/// Note: `event_id` is declared `UNIQUE`, which already creates an implicit
109/// index. A separate `idx_{{table}}_dead_letter_event_id` index is omitted.
110const SQLITE_DLQ_SCHEMA_SQL: &str = r"
111CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
112    id            INTEGER  PRIMARY KEY AUTOINCREMENT,
113    event_id      BLOB     NOT NULL UNIQUE,
114    event_type    TEXT     NOT NULL,
115    payload       TEXT     NOT NULL,
116    subject_id    BLOB,
117    created_at    TEXT     NOT NULL,
118    attempts      INTEGER  NOT NULL,
119    last_error    TEXT     NOT NULL,
120    exhausted_at  TEXT     NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
121);
122CREATE INDEX IF NOT EXISTS idx_{{table}}_dead_letter_exhausted_at
123    ON {{table}}_dead_letter (exhausted_at);
124";
125
126/// Canonical SQLite schema for an outbox table.
127///
128/// SQLite has dynamic typing, so UUIDs are stored as `BLOB` and the payload
129/// and timestamps as `TEXT`. The `created_at` default is rendered as RFC 3339
130/// (`...T...Z`) so it sorts lexicographically against the bound timestamps.
131const SQLITE_SCHEMA_SQL: &str = r"
132CREATE TABLE IF NOT EXISTS {{table}} (
133    id            INTEGER  PRIMARY KEY AUTOINCREMENT,
134    event_id      BLOB     NOT NULL UNIQUE,
135    event_type    TEXT     NOT NULL,
136    payload       TEXT     NOT NULL,
137    subject_id    BLOB,
138    created_at    TEXT     NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
139    attempts      INTEGER  NOT NULL DEFAULT 0,
140    last_error    TEXT,
141    next_retry_at TEXT,
142    delivered_at  TEXT
143);
144CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
145    ON {{table}} (created_at)
146    WHERE delivered_at IS NULL;
147CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
148    ON {{table}} (subject_id, id)
149    WHERE subject_id IS NOT NULL;
150";
151
152/// SQL dialect differences absorbed by the backend stores.
153///
154/// A [`Dialect`] knows how to render the four statements the outbox needs
155/// (poll, mark-delivered, mark-failed, insert) and the canonical schema DDL
156/// for its engine, accounting for placeholder style, row locking, the
157/// "current instant" expression and per-engine column types.
158///
159/// Marked `#[non_exhaustive]` so a future SQL backend can be added in a minor
160/// version: downstream `match` arms must include a wildcard `_` arm.
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162#[non_exhaustive]
163pub enum Dialect {
164    /// PostgreSQL (`sqlx::Postgres`).
165    Postgres,
166    /// MySQL 8.0+ (`sqlx::MySql`).
167    MySql,
168    /// SQLite (`sqlx::Sqlite`).
169    Sqlite,
170}
171
172impl Dialect {
173    /// Whether competing-consumers row skip-locking is available.
174    ///
175    /// `true` for PostgreSQL and MySQL 8.0+, `false` for SQLite (which
176    /// serializes writes through a single writer instead).
177    #[must_use]
178    pub fn supports_skip_locked(self) -> bool {
179        matches!(self, Self::Postgres | Self::MySql)
180    }
181
182    /// Render the bind placeholder for the 1-based parameter `index`.
183    ///
184    /// PostgreSQL uses positional `$1`, `$2`; MySQL and SQLite use `?`.
185    pub(crate) fn placeholder(self, index: usize) -> String {
186        match self {
187            Self::Postgres => format!("${index}"),
188            Self::MySql | Self::Sqlite => "?".to_owned(),
189        }
190    }
191
192    /// Wraps a validated identifier in this dialect's native quoting.
193    ///
194    /// PostgreSQL and SQLite use the SQL-standard double quote; MySQL uses
195    /// backticks because it does not enable `ANSI_QUOTES` by default, so a
196    /// double-quoted identifier is parsed as a string literal and rejected.
197    /// The name is assumed to have passed [`validate_table_name`], so it cannot
198    /// contain a quote character and quoting only restores reserved-word and
199    /// case handling.
200    pub(crate) fn quote_identifier(self, name: &str) -> String {
201        match self {
202            Self::Postgres | Self::Sqlite => format!("\"{name}\""),
203            Self::MySql => format!("`{name}`"),
204        }
205    }
206
207    /// SQL expression evaluating to the current instant, in a form
208    /// comparable to the stored timestamps.
209    pub(crate) fn now_expr(self) -> &'static str {
210        match self {
211            Self::Postgres => "NOW()",
212            // UTC_TIMESTAMP(6) is independent of the server session time zone
213            // and matches the DATETIME(6) microsecond precision the MySQL store
214            // binds, so the poll predicate never skips a sub-second retry.
215            Self::MySql => "UTC_TIMESTAMP(6)",
216            Self::Sqlite => "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
217        }
218    }
219
220    /// SQL expression evaluating to the database current instant offset by a
221    /// bound interval, in a form comparable to the stored timestamps.
222    ///
223    /// The offset is taken from the bind parameter at `index` and is always
224    /// anchored to the **database** clock (`NOW()` / `UTC_TIMESTAMP(6)` /
225    /// `strftime('now')`), never the application clock. This keeps lease and
226    /// retry comparisons consistent even when the worker host and the database
227    /// host disagree on wall-clock time (#230).
228    ///
229    /// The bound value's unit differs per engine, so each store binds the
230    /// matching scalar: PostgreSQL binds seconds as `f64`, MySQL binds whole
231    /// microseconds as `i64`, and SQLite binds a `strftime` modifier string
232    /// such as `"+1.500 seconds"`.
233    pub(crate) fn now_plus_interval_expr(self, index: usize) -> String {
234        let ph = self.placeholder(index);
235        match self {
236            Self::Postgres => {
237                format!("(NOW() + (CAST({ph} AS DOUBLE PRECISION) * INTERVAL '1 second'))")
238            }
239            Self::MySql => format!("(UTC_TIMESTAMP(6) + INTERVAL {ph} MICROSECOND)"),
240            Self::Sqlite => format!("strftime('%Y-%m-%dT%H:%M:%fZ', 'now', {ph})"),
241        }
242    }
243
244    /// `SELECT ... WHERE delivered_at IS NULL ... [FOR UPDATE SKIP LOCKED]`.
245    pub(crate) fn poll_sql(self, table: &str) -> String {
246        let qtable = self.quote_identifier(table);
247        let max_attempts = self.placeholder(1);
248        let limit = self.placeholder(2);
249        let now = self.now_expr();
250        let lock = if self.supports_skip_locked() {
251            " FOR UPDATE SKIP LOCKED"
252        } else {
253            ""
254        };
255        format!(
256            "SELECT event_id, event_type, payload, subject_id, created_at, \
257                    attempts, last_error, next_retry_at \
258             FROM {qtable} \
259             WHERE delivered_at IS NULL \
260               AND attempts < {max_attempts} \
261               AND (next_retry_at IS NULL OR next_retry_at <= {now}) \
262             ORDER BY id \
263             LIMIT {limit}{lock}"
264        )
265    }
266
267    /// `UPDATE {qtable} SET delivered_at = {now} WHERE event_id = {ph}`.
268    pub(crate) fn mark_delivered_sql(self, table: &str) -> String {
269        let qtable = self.quote_identifier(table);
270        let event_id = self.placeholder(1);
271        let now = self.now_expr();
272        format!("UPDATE {qtable} SET delivered_at = {now} WHERE event_id = {event_id}")
273    }
274
275    /// `UPDATE {qtable} SET last_error, next_retry_at = {now + interval} ...`.
276    ///
277    /// `next_retry_at` is derived from the **database** clock plus the bound
278    /// backoff interval (parameter 2), not from an application-supplied
279    /// timestamp, so retry scheduling is immune to app/DB clock skew (#230).
280    ///
281    /// The attempt counter is **not** incremented here: it is consumed once
282    /// per dispatch attempt by [`Self::claim_sql`] at claim time, so that a
283    /// worker that crashes between claim and this call still burns one retry
284    /// slot. Incrementing again here would double-count every clean failure.
285    pub(crate) fn mark_failed_sql(self, table: &str) -> String {
286        let qtable = self.quote_identifier(table);
287        let last_error = self.placeholder(1);
288        let next_retry_at = self.now_plus_interval_expr(2);
289        let event_id = self.placeholder(3);
290        format!(
291            "UPDATE {qtable} \
292             SET last_error = {last_error}, next_retry_at = {next_retry_at} \
293             WHERE event_id = {event_id}"
294        )
295    }
296
297    /// `INSERT INTO {qtable} (event_id, event_type, payload, subject_id) VALUES (...)`.
298    pub(crate) fn insert_sql(self, table: &str) -> String {
299        let qtable = self.quote_identifier(table);
300        let p1 = self.placeholder(1);
301        let p2 = self.placeholder(2);
302        let p3 = self.placeholder(3);
303        let p4 = self.placeholder(4);
304        format!(
305            "INSERT INTO {qtable} (event_id, event_type, payload, subject_id) \
306             VALUES ({p1}, {p2}, {p3}, {p4})"
307        )
308    }
309
310    /// Canonical schema DDL (table + indexes) rendered for this dialect.
311    ///
312    /// # Errors
313    ///
314    /// Returns [`OutboxError::Internal`] if `table` is not a valid
315    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$` or exceeds
316    /// [`crate::validate::MAX_IDENTIFIER_LEN`] bytes.
317    pub fn schema_ddl(self, table: &str) -> Result<String, OutboxError> {
318        validate_table_name(table)?;
319        let template = match self {
320            Self::Postgres => POSTGRES_SCHEMA_SQL,
321            Self::MySql => MYSQL_SCHEMA_SQL,
322            Self::Sqlite => SQLITE_SCHEMA_SQL,
323        };
324        Ok(template.replace("{{table}}", table))
325    }
326
327    /// Dead-letter schema DDL (table + indexes) rendered for this dialect.
328    ///
329    /// Creates a table named `{table}_dead_letter`. Envelopes are moved here
330    /// when they exhaust `max_attempts`.
331    ///
332    /// # Errors
333    ///
334    /// Returns [`OutboxError::Internal`] if `table` is not a valid
335    /// identifier matching `^[a-zA-Z_][a-zA-Z0-9_]*$` or exceeds
336    /// [`crate::validate::MAX_IDENTIFIER_LEN`] bytes.
337    pub fn dead_letter_schema_ddl(self, table: &str) -> Result<String, OutboxError> {
338        validate_table_name(table)?;
339        let template = match self {
340            Self::Postgres => POSTGRES_DLQ_SCHEMA_SQL,
341            Self::MySql => MYSQL_DLQ_SCHEMA_SQL,
342            Self::Sqlite => SQLITE_DLQ_SCHEMA_SQL,
343        };
344        Ok(template.replace("{{table}}", table))
345    }
346
347    /// `INSERT INTO {dlq} (...) SELECT ... FROM {main} WHERE event_id = {p1}`.
348    ///
349    /// Copies a row from the main outbox table into the dead-letter table.
350    /// `exhausted_at` is not listed and gets its `DEFAULT` value (`NOW()` or
351    /// equivalent). Called inside the same transaction as `mark_failed`.
352    pub(crate) fn insert_dead_letter_sql(self, main: &str, dlq: &str) -> String {
353        let qmain = self.quote_identifier(main);
354        let qdlq = self.quote_identifier(dlq);
355        let event_id = self.placeholder(1);
356        format!(
357            "INSERT INTO {qdlq} \
358             (event_id, event_type, payload, subject_id, created_at, attempts, last_error) \
359             SELECT event_id, event_type, payload, subject_id, created_at, attempts, last_error \
360             FROM {qmain} \
361             WHERE event_id = {event_id}"
362        )
363    }
364
365    /// `DELETE FROM {qtable} WHERE event_id = {p1}`.
366    ///
367    /// Removes the row from the main outbox table after it has been copied to
368    /// the dead-letter table. Called in the same transaction as
369    /// [`Self::insert_dead_letter_sql`].
370    pub(crate) fn delete_from_main_sql(self, table: &str) -> String {
371        let qtable = self.quote_identifier(table);
372        let event_id = self.placeholder(1);
373        format!("DELETE FROM {qtable} WHERE event_id = {event_id}")
374    }
375
376    /// Claim SQL for Postgres: uses `= ANY($2)` with a single UUID-array bind
377    /// so the number of bind parameters is fixed regardless of batch size,
378    /// avoiding the 65,535 bind-parameter limit inherent to an `IN`-list.
379    ///
380    /// For MySQL and SQLite a per-row `IN`-list is still generated because
381    /// neither supports the `= ANY($n)` array-bind syntax.
382    ///
383    /// Sets a soft lease on the given envelopes so competing workers skip
384    /// them until the lease expires, and consumes one retry slot by
385    /// incrementing `attempts`. The lease expiry is computed from the
386    /// **database** clock plus the bound lease interval (parameter 1), not an
387    /// application timestamp, so the lease window is immune to app/DB clock
388    /// skew (#230).
389    ///
390    /// Incrementing `attempts` at claim time (rather than only on failure in
391    /// [`Self::mark_failed_sql`]) is what makes a worker crash between claim
392    /// and acknowledgement safe: the attempt is already counted, so the
393    /// envelope cannot be redelivered forever without ever reaching the
394    /// dead-letter threshold.
395    // Every store issues a claim now (postgres/mysql for the competing-consumer
396    // lease, sqlite to increment attempts); only a feature-less build leaves it
397    // unused.
398    #[cfg_attr(
399        not(any(feature = "postgres", feature = "mysql", feature = "sqlite")),
400        allow(dead_code)
401    )]
402    pub(crate) fn claim_sql(self, table: &str, n: usize) -> String {
403        let qtable = self.quote_identifier(table);
404        let lease = self.now_plus_interval_expr(1);
405        match self {
406            Self::Postgres => {
407                // $2 is bound as a UUID array, so the bind count is always 2
408                // regardless of batch size, sidestepping the 65,535-parameter limit.
409                format!(
410                    "UPDATE {qtable} SET next_retry_at = {lease}, attempts = attempts + 1 \
411                     WHERE event_id = ANY($2)"
412                )
413            }
414            Self::MySql | Self::Sqlite => {
415                let placeholders = (2..=n + 1)
416                    .map(|i| self.placeholder(i))
417                    .collect::<Vec<_>>()
418                    .join(", ");
419                format!(
420                    "UPDATE {qtable} SET next_retry_at = {lease}, attempts = attempts + 1 \
421                     WHERE event_id IN ({placeholders})"
422                )
423            }
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    fn skip_locked_support_matches_engine_capabilities() {
434        assert!(Dialect::Postgres.supports_skip_locked());
435        assert!(Dialect::MySql.supports_skip_locked());
436        assert!(!Dialect::Sqlite.supports_skip_locked());
437    }
438
439    #[test]
440    fn postgres_uses_positional_placeholders() {
441        assert_eq!(Dialect::Postgres.placeholder(1), "$1");
442        assert_eq!(Dialect::Postgres.placeholder(4), "$4");
443    }
444
445    #[test]
446    fn mysql_and_sqlite_use_question_mark_placeholders() {
447        assert_eq!(Dialect::MySql.placeholder(1), "?");
448        assert_eq!(Dialect::Sqlite.placeholder(3), "?");
449    }
450
451    #[test]
452    fn postgres_poll_sql_locks_rows_and_binds_positionally() {
453        let sql = Dialect::Postgres.poll_sql("audit_outbox");
454        assert!(sql.contains("FROM \"audit_outbox\""));
455        assert!(sql.contains("$1"));
456        assert!(sql.contains("$2"));
457        assert!(sql.contains("ORDER BY id"));
458        assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
459        assert!(sql.contains("NOW()"));
460    }
461
462    #[test]
463    fn poll_sql_quotes_reserved_word_table_name() {
464        // A table named "user" is a reserved word in SQL; quoting prevents
465        // a runtime syntax error when the table name is embedded in statements.
466        let sql = Dialect::Postgres.poll_sql("user");
467        assert!(sql.contains("FROM \"user\""));
468    }
469
470    #[test]
471    fn mysql_poll_sql_locks_rows_with_question_marks() {
472        let sql = Dialect::MySql.poll_sql("audit_outbox");
473        // MySQL quotes identifiers with backticks, not the SQL-standard double
474        // quote (which it reads as a string literal unless ANSI_QUOTES is set).
475        assert!(sql.contains("FROM `audit_outbox`"));
476        assert!(!sql.contains('"'));
477        assert!(sql.contains('?'));
478        assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
479    }
480
481    #[test]
482    fn sqlite_poll_sql_omits_skip_locked() {
483        let sql = Dialect::Sqlite.poll_sql("audit_outbox");
484        assert!(sql.contains("FROM \"audit_outbox\""));
485        assert!(sql.contains('?'));
486        assert!(!sql.contains("FOR UPDATE SKIP LOCKED"));
487        assert!(sql.contains("strftime"));
488    }
489
490    #[test]
491    fn postgres_mark_delivered_sets_timestamp_by_event_id() {
492        let sql = Dialect::Postgres.mark_delivered_sql("audit_outbox");
493        assert!(sql.contains("UPDATE \"audit_outbox\""));
494        assert!(sql.contains("delivered_at"));
495        assert!(sql.contains("$1"));
496    }
497
498    #[test]
499    fn postgres_mark_failed_does_not_increment_attempts_with_three_binds() {
500        let sql = Dialect::Postgres.mark_failed_sql("audit_outbox");
501        // The increment moved to claim_sql so a crash between claim and
502        // mark_failed still consumes a retry slot; mark_failed must not
503        // double-count.
504        assert!(!sql.contains("attempts = attempts + 1"));
505        assert!(sql.contains("last_error = $1"));
506        // next_retry_at is computed from the DB clock plus the bound interval
507        // ($2), never an app timestamp (#230).
508        assert!(sql.contains("next_retry_at = (NOW() +"));
509        assert!(sql.contains("$2"));
510        assert!(sql.contains("WHERE event_id = $3"));
511    }
512
513    #[test]
514    fn postgres_insert_sql_binds_four_columns() {
515        let sql = Dialect::Postgres.insert_sql("audit_outbox");
516        assert!(sql.contains("INSERT INTO \"audit_outbox\""));
517        assert!(sql.contains("event_id, event_type, payload, subject_id"));
518        assert!(sql.contains("$1, $2, $3, $4"));
519    }
520
521    #[test]
522    fn sqlite_insert_sql_uses_question_marks() {
523        let sql = Dialect::Sqlite.insert_sql("audit_outbox");
524        assert!(sql.contains("INSERT INTO \"audit_outbox\""));
525        assert!(sql.contains("?, ?, ?, ?"));
526    }
527
528    #[test]
529    fn mysql_insert_sql_quotes_table_with_backticks() {
530        // Regression for the cross-dialect quoting bug: MySQL rejects a
531        // double-quoted identifier in DML (it reads it as a string literal),
532        // so the INSERT must use backticks.
533        let sql = Dialect::MySql.insert_sql("audit_outbox");
534        assert!(sql.contains("INSERT INTO `audit_outbox`"));
535        assert!(!sql.contains('"'));
536        assert!(sql.contains("?, ?, ?, ?"));
537    }
538
539    #[test]
540    fn quote_identifier_is_dialect_specific() {
541        assert_eq!(Dialect::Postgres.quote_identifier("t"), "\"t\"");
542        assert_eq!(Dialect::Sqlite.quote_identifier("t"), "\"t\"");
543        assert_eq!(Dialect::MySql.quote_identifier("t"), "`t`");
544    }
545
546    #[test]
547    fn postgres_schema_ddl_matches_current_canonical_schema() {
548        let ddl = Dialect::Postgres.schema_ddl("audit_outbox").unwrap();
549        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
550        assert!(ddl.contains("BIGSERIAL"));
551        assert!(ddl.contains("UUID"));
552        assert!(ddl.contains("JSONB"));
553        assert!(ddl.contains("TIMESTAMPTZ"));
554        assert!(ddl.contains("idx_audit_outbox_pending"));
555        assert!(ddl.contains("idx_audit_outbox_subject"));
556        assert!(!ddl.contains("{{table}}"));
557    }
558
559    #[test]
560    fn mysql_schema_ddl_uses_native_types() {
561        let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
562        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
563        assert!(ddl.contains("AUTO_INCREMENT"));
564        assert!(ddl.contains("BINARY(16)"));
565        assert!(ddl.contains("JSON"));
566        assert!(!ddl.contains("{{table}}"));
567    }
568
569    #[test]
570    fn sqlite_schema_ddl_uses_portable_text_types() {
571        let ddl = Dialect::Sqlite.schema_ddl("audit_outbox").unwrap();
572        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
573        assert!(ddl.contains("AUTOINCREMENT"));
574        assert!(ddl.contains("BLOB"));
575        assert!(ddl.contains("strftime"));
576        assert!(!ddl.contains("{{table}}"));
577    }
578
579    #[test]
580    fn schema_ddl_rejects_invalid_table_name() {
581        let err = Dialect::Postgres.schema_ddl("bad name; DROP").unwrap_err();
582        assert!(matches!(err, OutboxError::Internal(_)));
583    }
584
585    #[test]
586    fn mysql_poll_compares_against_microsecond_utc() {
587        let sql = Dialect::MySql.poll_sql("audit_outbox");
588        assert!(sql.contains("UTC_TIMESTAMP(6)"));
589        assert!(!sql.contains("UTC_TIMESTAMP()"));
590    }
591
592    #[test]
593    fn mysql_mark_delivered_uses_microsecond_utc() {
594        let sql = Dialect::MySql.mark_delivered_sql("audit_outbox");
595        assert!(sql.contains("delivered_at = UTC_TIMESTAMP(6)"));
596    }
597
598    #[test]
599    fn mysql_schema_ddl_defaults_created_at_to_utc() {
600        let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
601        assert!(ddl.contains("UTC_TIMESTAMP(6)"));
602    }
603
604    #[test]
605    fn postgres_dead_letter_schema_ddl_substitutes_table_name() {
606        let ddl = Dialect::Postgres
607            .dead_letter_schema_ddl("audit_outbox")
608            .unwrap();
609        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
610        assert!(ddl.contains("exhausted_at"));
611        // The redundant event_id index is dropped: event_id is already UNIQUE,
612        // which creates an implicit index. Only the exhausted_at index remains.
613        assert!(!ddl.contains("idx_audit_outbox_dead_letter_event_id"));
614        assert!(ddl.contains("idx_audit_outbox_dead_letter_exhausted_at"));
615        assert!(!ddl.contains("{{table}}"));
616    }
617
618    #[test]
619    fn mysql_dead_letter_schema_ddl_uses_native_types() {
620        let ddl = Dialect::MySql
621            .dead_letter_schema_ddl("audit_outbox")
622            .unwrap();
623        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
624        assert!(ddl.contains("BINARY(16)"));
625        assert!(ddl.contains("UTC_TIMESTAMP(6)"));
626        assert!(!ddl.contains("{{table}}"));
627    }
628
629    #[test]
630    fn sqlite_dead_letter_schema_ddl_uses_portable_text_types() {
631        let ddl = Dialect::Sqlite
632            .dead_letter_schema_ddl("audit_outbox")
633            .unwrap();
634        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
635        assert!(ddl.contains("strftime"));
636        assert!(!ddl.contains("{{table}}"));
637    }
638
639    #[test]
640    fn dead_letter_schema_ddl_rejects_invalid_table_name() {
641        let err = Dialect::Postgres
642            .dead_letter_schema_ddl("bad name; DROP")
643            .unwrap_err();
644        assert!(matches!(err, OutboxError::Internal(_)));
645    }
646
647    #[test]
648    fn postgres_insert_dead_letter_sql_selects_from_main() {
649        let sql =
650            Dialect::Postgres.insert_dead_letter_sql("audit_outbox", "audit_outbox_dead_letter");
651        assert!(sql.contains("INSERT INTO \"audit_outbox_dead_letter\""));
652        assert!(sql.contains("SELECT"));
653        assert!(sql.contains("FROM \"audit_outbox\""));
654        assert!(sql.contains("$1"));
655        assert!(!sql.contains("exhausted_at"));
656    }
657
658    #[test]
659    fn sqlite_insert_dead_letter_sql_uses_question_mark() {
660        let sql = Dialect::Sqlite.insert_dead_letter_sql("audit_outbox", "audit_outbox_dlq");
661        assert!(sql.contains("INSERT INTO \"audit_outbox_dlq\""));
662        assert!(sql.contains("FROM \"audit_outbox\""));
663        assert!(sql.contains('?'));
664    }
665
666    #[test]
667    fn postgres_delete_from_main_sql_binds_positionally() {
668        let sql = Dialect::Postgres.delete_from_main_sql("audit_outbox");
669        assert!(sql.contains("DELETE FROM \"audit_outbox\""));
670        assert!(sql.contains("$1"));
671    }
672
673    #[test]
674    fn sqlite_delete_from_main_sql_uses_question_mark() {
675        let sql = Dialect::Sqlite.delete_from_main_sql("audit_outbox");
676        assert!(sql.contains("DELETE FROM \"audit_outbox\""));
677        assert!(sql.contains('?'));
678    }
679
680    #[test]
681    fn postgres_claim_sql_uses_any_array_instead_of_in_list() {
682        // ANY($2) avoids the 65,535 bind-parameter limit that an IN-list of
683        // UUIDs would hit at large batch sizes (#240).
684        let sql = Dialect::Postgres.claim_sql("audit_outbox", 3);
685        assert!(sql.contains("UPDATE \"audit_outbox\""));
686        // Lease anchored to the DB clock plus the bound interval ($1), #230.
687        assert!(sql.contains("SET next_retry_at = (NOW() +"));
688        assert!(sql.contains("$1"));
689        // Single array bind; no per-row placeholders ($2, $3, $4).
690        assert!(sql.contains("WHERE event_id = ANY($2)"));
691        assert!(!sql.contains("$3"));
692        assert!(!sql.contains("$4"));
693        assert!(!sql.contains("WHERE event_id IN"));
694    }
695
696    #[test]
697    fn postgres_claim_sql_any_bind_count_is_independent_of_batch_size() {
698        // Regardless of n the Postgres claim SQL has exactly two bind
699        // parameters: $1 for the lease interval and $2 for the UUID array.
700        for n in [1, 10, 1000] {
701            let sql = Dialect::Postgres.claim_sql("audit_outbox", n);
702            assert!(
703                sql.contains("ANY($2)"),
704                "n={n}: expected ANY($2), got: {sql}"
705            );
706            assert!(
707                !sql.contains("$3"),
708                "n={n}: unexpected $3 placeholder, got: {sql}"
709            );
710        }
711    }
712
713    #[test]
714    fn mysql_claim_sql_uses_question_marks() {
715        let sql = Dialect::MySql.claim_sql("audit_outbox", 2);
716        assert!(sql.contains("UPDATE `audit_outbox`"));
717        assert!(sql.contains("SET next_retry_at = (UTC_TIMESTAMP(6) + INTERVAL ? MICROSECOND)"));
718        assert!(sql.contains("WHERE event_id IN (?, ?)"));
719    }
720
721    #[test]
722    fn sqlite_claim_sql_uses_question_marks() {
723        let sql = Dialect::Sqlite.claim_sql("audit_outbox", 1);
724        assert!(sql.contains("UPDATE \"audit_outbox\""));
725        assert!(sql.contains("SET next_retry_at = strftime("));
726        assert!(sql.contains("'now', ?)"));
727        assert!(sql.contains("WHERE event_id IN (?)"));
728    }
729
730    #[test]
731    fn now_plus_interval_uses_db_clock_per_dialect() {
732        // Regression guard for #230: the lease/retry anchor is the database
733        // clock, never an application timestamp bound by the worker.
734        assert!(
735            Dialect::Postgres
736                .now_plus_interval_expr(1)
737                .contains("NOW()")
738        );
739        assert!(
740            Dialect::MySql
741                .now_plus_interval_expr(1)
742                .contains("UTC_TIMESTAMP(6)")
743        );
744        assert!(Dialect::Sqlite.now_plus_interval_expr(1).contains("'now'"));
745    }
746
747    #[test]
748    fn claim_sql_increments_attempts_for_every_dialect() {
749        // Regression guard for #213: claiming an envelope must consume a
750        // retry slot so a crash between claim and mark_failed cannot
751        // redeliver a poison row forever.
752        for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
753            let sql = dialect.claim_sql("audit_outbox", 2);
754            assert!(
755                sql.contains("attempts = attempts + 1"),
756                "{dialect:?} claim_sql must increment attempts, got: {sql}"
757            );
758        }
759    }
760
761    #[test]
762    fn schema_ddl_rejects_overlength_table_name() {
763        // A name of 64 bytes must exceed MAX_IDENTIFIER_LEN (63) and be rejected
764        // so that derived index names cannot collide after server-side truncation.
765        let long_name = "a".repeat(64);
766        for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
767            let err = dialect.schema_ddl(&long_name).unwrap_err();
768            assert!(
769                matches!(err, OutboxError::Internal(_)),
770                "{dialect:?} must reject a 64-byte table name"
771            );
772        }
773    }
774
775    #[test]
776    fn dlq_schema_ddl_does_not_create_redundant_event_id_index() {
777        // event_id is UNIQUE in every DLQ DDL, which creates an implicit index.
778        // A separate named index would be write overhead for no read benefit.
779        for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
780            let ddl = dialect.dead_letter_schema_ddl("audit_outbox").unwrap();
781            assert!(
782                !ddl.contains("dead_letter_event_id"),
783                "{dialect:?} DLQ DDL must not create a redundant event_id index, got:\n{ddl}"
784            );
785        }
786    }
787
788    #[test]
789    fn sql_generation_quotes_identifiers() {
790        // All DML helpers must embed the identifier quoted in the dialect's
791        // native style so reserved words (e.g. `user`, `order`) are safe
792        // without runtime errors. MySQL uses backticks; the others use the
793        // SQL-standard double quote.
794        for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
795            let table = "user";
796            let quoted = dialect.quote_identifier(table);
797            assert!(
798                dialect.poll_sql(table).contains(&quoted),
799                "{dialect:?} poll_sql must quote the table name"
800            );
801            assert!(
802                dialect.mark_delivered_sql(table).contains(&quoted),
803                "{dialect:?} mark_delivered_sql must quote the table name"
804            );
805            assert!(
806                dialect.mark_failed_sql(table).contains(&quoted),
807                "{dialect:?} mark_failed_sql must quote the table name"
808            );
809            assert!(
810                dialect.insert_sql(table).contains(&quoted),
811                "{dialect:?} insert_sql must quote the table name"
812            );
813            assert!(
814                dialect.claim_sql(table, 1).contains(&quoted),
815                "{dialect:?} claim_sql must quote the table name"
816            );
817            assert!(
818                dialect.delete_from_main_sql(table).contains(&quoted),
819                "{dialect:?} delete_from_main_sql must quote the table name"
820            );
821        }
822    }
823}