1use hexeract_outbox::OutboxError;
2
3use crate::validate::validate_table_name;
4
5const POSTGRES_SCHEMA_SQL: &str = r"
10CREATE TABLE IF NOT EXISTS {{table}} (
11 id BIGSERIAL PRIMARY KEY,
12 event_id UUID NOT NULL UNIQUE,
13 event_type VARCHAR(64) NOT NULL,
14 payload JSONB NOT NULL,
15 subject_id UUID NULL,
16 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
17 attempts INTEGER NOT NULL DEFAULT 0,
18 last_error TEXT NULL,
19 next_retry_at TIMESTAMPTZ NULL,
20 delivered_at TIMESTAMPTZ NULL
21);
22CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
23 ON {{table}} (created_at)
24 WHERE delivered_at IS NULL;
25CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
26 ON {{table}} (subject_id, id)
27 WHERE subject_id IS NOT NULL;
28";
29
30const MYSQL_SCHEMA_SQL: &str = r"
38CREATE TABLE IF NOT EXISTS {{table}} (
39 id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
40 event_id BINARY(16) NOT NULL UNIQUE,
41 event_type VARCHAR(64) NOT NULL,
42 payload JSON NOT NULL,
43 subject_id BINARY(16) NULL,
44 created_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
45 attempts INT NOT NULL DEFAULT 0,
46 last_error TEXT NULL,
47 next_retry_at DATETIME(6) NULL,
48 delivered_at DATETIME(6) NULL,
49 INDEX idx_{{table}}_pending (delivered_at, created_at),
50 INDEX idx_{{table}}_subject (subject_id, id)
51);
52";
53
54const POSTGRES_DLQ_SCHEMA_SQL: &str = r"
64CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
65 id BIGSERIAL PRIMARY KEY,
66 event_id UUID NOT NULL UNIQUE,
67 event_type VARCHAR(64) NOT NULL,
68 payload JSONB NOT NULL,
69 subject_id UUID NULL,
70 created_at TIMESTAMPTZ NOT NULL,
71 attempts INTEGER NOT NULL,
72 last_error TEXT NOT NULL,
73 exhausted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
74);
75CREATE INDEX IF NOT EXISTS idx_{{table}}_dead_letter_exhausted_at
76 ON {{table}}_dead_letter (exhausted_at);
77";
78
79const MYSQL_DLQ_SCHEMA_SQL: &str = r"
88CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
89 id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
90 event_id BINARY(16) NOT NULL UNIQUE,
91 event_type VARCHAR(64) NOT NULL,
92 payload JSON NOT NULL,
93 subject_id BINARY(16) NULL,
94 created_at DATETIME(6) NOT NULL,
95 attempts INT NOT NULL,
96 last_error TEXT NOT NULL,
97 exhausted_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
98 INDEX idx_{{table}}_dead_letter_exhausted_at (exhausted_at)
99);
100";
101
102const SQLITE_DLQ_SCHEMA_SQL: &str = r"
111CREATE TABLE IF NOT EXISTS {{table}}_dead_letter (
112 id INTEGER PRIMARY KEY AUTOINCREMENT,
113 event_id BLOB NOT NULL UNIQUE,
114 event_type TEXT NOT NULL,
115 payload TEXT NOT NULL,
116 subject_id BLOB,
117 created_at TEXT NOT NULL,
118 attempts INTEGER NOT NULL,
119 last_error TEXT NOT NULL,
120 exhausted_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
121);
122CREATE INDEX IF NOT EXISTS idx_{{table}}_dead_letter_exhausted_at
123 ON {{table}}_dead_letter (exhausted_at);
124";
125
126const SQLITE_SCHEMA_SQL: &str = r"
132CREATE TABLE IF NOT EXISTS {{table}} (
133 id INTEGER PRIMARY KEY AUTOINCREMENT,
134 event_id BLOB NOT NULL UNIQUE,
135 event_type TEXT NOT NULL,
136 payload TEXT NOT NULL,
137 subject_id BLOB,
138 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
139 attempts INTEGER NOT NULL DEFAULT 0,
140 last_error TEXT,
141 next_retry_at TEXT,
142 delivered_at TEXT
143);
144CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
145 ON {{table}} (created_at)
146 WHERE delivered_at IS NULL;
147CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
148 ON {{table}} (subject_id, id)
149 WHERE subject_id IS NOT NULL;
150";
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162#[non_exhaustive]
163pub enum Dialect {
164 Postgres,
166 MySql,
168 Sqlite,
170}
171
172impl Dialect {
173 #[must_use]
178 pub fn supports_skip_locked(self) -> bool {
179 matches!(self, Self::Postgres | Self::MySql)
180 }
181
182 pub(crate) fn placeholder(self, index: usize) -> String {
186 match self {
187 Self::Postgres => format!("${index}"),
188 Self::MySql | Self::Sqlite => "?".to_owned(),
189 }
190 }
191
192 pub(crate) fn quote_identifier(self, name: &str) -> String {
201 match self {
202 Self::Postgres | Self::Sqlite => format!("\"{name}\""),
203 Self::MySql => format!("`{name}`"),
204 }
205 }
206
207 pub(crate) fn now_expr(self) -> &'static str {
210 match self {
211 Self::Postgres => "NOW()",
212 Self::MySql => "UTC_TIMESTAMP(6)",
216 Self::Sqlite => "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
217 }
218 }
219
220 pub(crate) fn now_plus_interval_expr(self, index: usize) -> String {
234 let ph = self.placeholder(index);
235 match self {
236 Self::Postgres => {
237 format!("(NOW() + (CAST({ph} AS DOUBLE PRECISION) * INTERVAL '1 second'))")
238 }
239 Self::MySql => format!("(UTC_TIMESTAMP(6) + INTERVAL {ph} MICROSECOND)"),
240 Self::Sqlite => format!("strftime('%Y-%m-%dT%H:%M:%fZ', 'now', {ph})"),
241 }
242 }
243
244 pub(crate) fn poll_sql(self, table: &str) -> String {
246 let qtable = self.quote_identifier(table);
247 let max_attempts = self.placeholder(1);
248 let limit = self.placeholder(2);
249 let now = self.now_expr();
250 let lock = if self.supports_skip_locked() {
251 " FOR UPDATE SKIP LOCKED"
252 } else {
253 ""
254 };
255 format!(
256 "SELECT event_id, event_type, payload, subject_id, created_at, \
257 attempts, last_error, next_retry_at \
258 FROM {qtable} \
259 WHERE delivered_at IS NULL \
260 AND attempts < {max_attempts} \
261 AND (next_retry_at IS NULL OR next_retry_at <= {now}) \
262 ORDER BY id \
263 LIMIT {limit}{lock}"
264 )
265 }
266
267 pub(crate) fn mark_delivered_sql(self, table: &str) -> String {
269 let qtable = self.quote_identifier(table);
270 let event_id = self.placeholder(1);
271 let now = self.now_expr();
272 format!("UPDATE {qtable} SET delivered_at = {now} WHERE event_id = {event_id}")
273 }
274
275 pub(crate) fn mark_failed_sql(self, table: &str) -> String {
286 let qtable = self.quote_identifier(table);
287 let last_error = self.placeholder(1);
288 let next_retry_at = self.now_plus_interval_expr(2);
289 let event_id = self.placeholder(3);
290 format!(
291 "UPDATE {qtable} \
292 SET last_error = {last_error}, next_retry_at = {next_retry_at} \
293 WHERE event_id = {event_id}"
294 )
295 }
296
297 pub(crate) fn insert_sql(self, table: &str) -> String {
299 let qtable = self.quote_identifier(table);
300 let p1 = self.placeholder(1);
301 let p2 = self.placeholder(2);
302 let p3 = self.placeholder(3);
303 let p4 = self.placeholder(4);
304 format!(
305 "INSERT INTO {qtable} (event_id, event_type, payload, subject_id) \
306 VALUES ({p1}, {p2}, {p3}, {p4})"
307 )
308 }
309
310 pub fn schema_ddl(self, table: &str) -> Result<String, OutboxError> {
318 validate_table_name(table)?;
319 let template = match self {
320 Self::Postgres => POSTGRES_SCHEMA_SQL,
321 Self::MySql => MYSQL_SCHEMA_SQL,
322 Self::Sqlite => SQLITE_SCHEMA_SQL,
323 };
324 Ok(template.replace("{{table}}", table))
325 }
326
327 pub fn dead_letter_schema_ddl(self, table: &str) -> Result<String, OutboxError> {
338 validate_table_name(table)?;
339 let template = match self {
340 Self::Postgres => POSTGRES_DLQ_SCHEMA_SQL,
341 Self::MySql => MYSQL_DLQ_SCHEMA_SQL,
342 Self::Sqlite => SQLITE_DLQ_SCHEMA_SQL,
343 };
344 Ok(template.replace("{{table}}", table))
345 }
346
347 pub(crate) fn insert_dead_letter_sql(self, main: &str, dlq: &str) -> String {
353 let qmain = self.quote_identifier(main);
354 let qdlq = self.quote_identifier(dlq);
355 let event_id = self.placeholder(1);
356 format!(
357 "INSERT INTO {qdlq} \
358 (event_id, event_type, payload, subject_id, created_at, attempts, last_error) \
359 SELECT event_id, event_type, payload, subject_id, created_at, attempts, last_error \
360 FROM {qmain} \
361 WHERE event_id = {event_id}"
362 )
363 }
364
365 pub(crate) fn delete_from_main_sql(self, table: &str) -> String {
371 let qtable = self.quote_identifier(table);
372 let event_id = self.placeholder(1);
373 format!("DELETE FROM {qtable} WHERE event_id = {event_id}")
374 }
375
376 #[cfg_attr(
399 not(any(feature = "postgres", feature = "mysql", feature = "sqlite")),
400 allow(dead_code)
401 )]
402 pub(crate) fn claim_sql(self, table: &str, n: usize) -> String {
403 let qtable = self.quote_identifier(table);
404 let lease = self.now_plus_interval_expr(1);
405 match self {
406 Self::Postgres => {
407 format!(
410 "UPDATE {qtable} SET next_retry_at = {lease}, attempts = attempts + 1 \
411 WHERE event_id = ANY($2)"
412 )
413 }
414 Self::MySql | Self::Sqlite => {
415 let placeholders = (2..=n + 1)
416 .map(|i| self.placeholder(i))
417 .collect::<Vec<_>>()
418 .join(", ");
419 format!(
420 "UPDATE {qtable} SET next_retry_at = {lease}, attempts = attempts + 1 \
421 WHERE event_id IN ({placeholders})"
422 )
423 }
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn skip_locked_support_matches_engine_capabilities() {
434 assert!(Dialect::Postgres.supports_skip_locked());
435 assert!(Dialect::MySql.supports_skip_locked());
436 assert!(!Dialect::Sqlite.supports_skip_locked());
437 }
438
439 #[test]
440 fn postgres_uses_positional_placeholders() {
441 assert_eq!(Dialect::Postgres.placeholder(1), "$1");
442 assert_eq!(Dialect::Postgres.placeholder(4), "$4");
443 }
444
445 #[test]
446 fn mysql_and_sqlite_use_question_mark_placeholders() {
447 assert_eq!(Dialect::MySql.placeholder(1), "?");
448 assert_eq!(Dialect::Sqlite.placeholder(3), "?");
449 }
450
451 #[test]
452 fn postgres_poll_sql_locks_rows_and_binds_positionally() {
453 let sql = Dialect::Postgres.poll_sql("audit_outbox");
454 assert!(sql.contains("FROM \"audit_outbox\""));
455 assert!(sql.contains("$1"));
456 assert!(sql.contains("$2"));
457 assert!(sql.contains("ORDER BY id"));
458 assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
459 assert!(sql.contains("NOW()"));
460 }
461
462 #[test]
463 fn poll_sql_quotes_reserved_word_table_name() {
464 let sql = Dialect::Postgres.poll_sql("user");
467 assert!(sql.contains("FROM \"user\""));
468 }
469
470 #[test]
471 fn mysql_poll_sql_locks_rows_with_question_marks() {
472 let sql = Dialect::MySql.poll_sql("audit_outbox");
473 assert!(sql.contains("FROM `audit_outbox`"));
476 assert!(!sql.contains('"'));
477 assert!(sql.contains('?'));
478 assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
479 }
480
481 #[test]
482 fn sqlite_poll_sql_omits_skip_locked() {
483 let sql = Dialect::Sqlite.poll_sql("audit_outbox");
484 assert!(sql.contains("FROM \"audit_outbox\""));
485 assert!(sql.contains('?'));
486 assert!(!sql.contains("FOR UPDATE SKIP LOCKED"));
487 assert!(sql.contains("strftime"));
488 }
489
490 #[test]
491 fn postgres_mark_delivered_sets_timestamp_by_event_id() {
492 let sql = Dialect::Postgres.mark_delivered_sql("audit_outbox");
493 assert!(sql.contains("UPDATE \"audit_outbox\""));
494 assert!(sql.contains("delivered_at"));
495 assert!(sql.contains("$1"));
496 }
497
498 #[test]
499 fn postgres_mark_failed_does_not_increment_attempts_with_three_binds() {
500 let sql = Dialect::Postgres.mark_failed_sql("audit_outbox");
501 assert!(!sql.contains("attempts = attempts + 1"));
505 assert!(sql.contains("last_error = $1"));
506 assert!(sql.contains("next_retry_at = (NOW() +"));
509 assert!(sql.contains("$2"));
510 assert!(sql.contains("WHERE event_id = $3"));
511 }
512
513 #[test]
514 fn postgres_insert_sql_binds_four_columns() {
515 let sql = Dialect::Postgres.insert_sql("audit_outbox");
516 assert!(sql.contains("INSERT INTO \"audit_outbox\""));
517 assert!(sql.contains("event_id, event_type, payload, subject_id"));
518 assert!(sql.contains("$1, $2, $3, $4"));
519 }
520
521 #[test]
522 fn sqlite_insert_sql_uses_question_marks() {
523 let sql = Dialect::Sqlite.insert_sql("audit_outbox");
524 assert!(sql.contains("INSERT INTO \"audit_outbox\""));
525 assert!(sql.contains("?, ?, ?, ?"));
526 }
527
528 #[test]
529 fn mysql_insert_sql_quotes_table_with_backticks() {
530 let sql = Dialect::MySql.insert_sql("audit_outbox");
534 assert!(sql.contains("INSERT INTO `audit_outbox`"));
535 assert!(!sql.contains('"'));
536 assert!(sql.contains("?, ?, ?, ?"));
537 }
538
539 #[test]
540 fn quote_identifier_is_dialect_specific() {
541 assert_eq!(Dialect::Postgres.quote_identifier("t"), "\"t\"");
542 assert_eq!(Dialect::Sqlite.quote_identifier("t"), "\"t\"");
543 assert_eq!(Dialect::MySql.quote_identifier("t"), "`t`");
544 }
545
546 #[test]
547 fn postgres_schema_ddl_matches_current_canonical_schema() {
548 let ddl = Dialect::Postgres.schema_ddl("audit_outbox").unwrap();
549 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
550 assert!(ddl.contains("BIGSERIAL"));
551 assert!(ddl.contains("UUID"));
552 assert!(ddl.contains("JSONB"));
553 assert!(ddl.contains("TIMESTAMPTZ"));
554 assert!(ddl.contains("idx_audit_outbox_pending"));
555 assert!(ddl.contains("idx_audit_outbox_subject"));
556 assert!(!ddl.contains("{{table}}"));
557 }
558
559 #[test]
560 fn mysql_schema_ddl_uses_native_types() {
561 let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
562 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
563 assert!(ddl.contains("AUTO_INCREMENT"));
564 assert!(ddl.contains("BINARY(16)"));
565 assert!(ddl.contains("JSON"));
566 assert!(!ddl.contains("{{table}}"));
567 }
568
569 #[test]
570 fn sqlite_schema_ddl_uses_portable_text_types() {
571 let ddl = Dialect::Sqlite.schema_ddl("audit_outbox").unwrap();
572 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
573 assert!(ddl.contains("AUTOINCREMENT"));
574 assert!(ddl.contains("BLOB"));
575 assert!(ddl.contains("strftime"));
576 assert!(!ddl.contains("{{table}}"));
577 }
578
579 #[test]
580 fn schema_ddl_rejects_invalid_table_name() {
581 let err = Dialect::Postgres.schema_ddl("bad name; DROP").unwrap_err();
582 assert!(matches!(err, OutboxError::Internal(_)));
583 }
584
585 #[test]
586 fn mysql_poll_compares_against_microsecond_utc() {
587 let sql = Dialect::MySql.poll_sql("audit_outbox");
588 assert!(sql.contains("UTC_TIMESTAMP(6)"));
589 assert!(!sql.contains("UTC_TIMESTAMP()"));
590 }
591
592 #[test]
593 fn mysql_mark_delivered_uses_microsecond_utc() {
594 let sql = Dialect::MySql.mark_delivered_sql("audit_outbox");
595 assert!(sql.contains("delivered_at = UTC_TIMESTAMP(6)"));
596 }
597
598 #[test]
599 fn mysql_schema_ddl_defaults_created_at_to_utc() {
600 let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
601 assert!(ddl.contains("UTC_TIMESTAMP(6)"));
602 }
603
604 #[test]
605 fn postgres_dead_letter_schema_ddl_substitutes_table_name() {
606 let ddl = Dialect::Postgres
607 .dead_letter_schema_ddl("audit_outbox")
608 .unwrap();
609 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
610 assert!(ddl.contains("exhausted_at"));
611 assert!(!ddl.contains("idx_audit_outbox_dead_letter_event_id"));
614 assert!(ddl.contains("idx_audit_outbox_dead_letter_exhausted_at"));
615 assert!(!ddl.contains("{{table}}"));
616 }
617
618 #[test]
619 fn mysql_dead_letter_schema_ddl_uses_native_types() {
620 let ddl = Dialect::MySql
621 .dead_letter_schema_ddl("audit_outbox")
622 .unwrap();
623 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
624 assert!(ddl.contains("BINARY(16)"));
625 assert!(ddl.contains("UTC_TIMESTAMP(6)"));
626 assert!(!ddl.contains("{{table}}"));
627 }
628
629 #[test]
630 fn sqlite_dead_letter_schema_ddl_uses_portable_text_types() {
631 let ddl = Dialect::Sqlite
632 .dead_letter_schema_ddl("audit_outbox")
633 .unwrap();
634 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox_dead_letter"));
635 assert!(ddl.contains("strftime"));
636 assert!(!ddl.contains("{{table}}"));
637 }
638
639 #[test]
640 fn dead_letter_schema_ddl_rejects_invalid_table_name() {
641 let err = Dialect::Postgres
642 .dead_letter_schema_ddl("bad name; DROP")
643 .unwrap_err();
644 assert!(matches!(err, OutboxError::Internal(_)));
645 }
646
647 #[test]
648 fn postgres_insert_dead_letter_sql_selects_from_main() {
649 let sql =
650 Dialect::Postgres.insert_dead_letter_sql("audit_outbox", "audit_outbox_dead_letter");
651 assert!(sql.contains("INSERT INTO \"audit_outbox_dead_letter\""));
652 assert!(sql.contains("SELECT"));
653 assert!(sql.contains("FROM \"audit_outbox\""));
654 assert!(sql.contains("$1"));
655 assert!(!sql.contains("exhausted_at"));
656 }
657
658 #[test]
659 fn sqlite_insert_dead_letter_sql_uses_question_mark() {
660 let sql = Dialect::Sqlite.insert_dead_letter_sql("audit_outbox", "audit_outbox_dlq");
661 assert!(sql.contains("INSERT INTO \"audit_outbox_dlq\""));
662 assert!(sql.contains("FROM \"audit_outbox\""));
663 assert!(sql.contains('?'));
664 }
665
666 #[test]
667 fn postgres_delete_from_main_sql_binds_positionally() {
668 let sql = Dialect::Postgres.delete_from_main_sql("audit_outbox");
669 assert!(sql.contains("DELETE FROM \"audit_outbox\""));
670 assert!(sql.contains("$1"));
671 }
672
673 #[test]
674 fn sqlite_delete_from_main_sql_uses_question_mark() {
675 let sql = Dialect::Sqlite.delete_from_main_sql("audit_outbox");
676 assert!(sql.contains("DELETE FROM \"audit_outbox\""));
677 assert!(sql.contains('?'));
678 }
679
680 #[test]
681 fn postgres_claim_sql_uses_any_array_instead_of_in_list() {
682 let sql = Dialect::Postgres.claim_sql("audit_outbox", 3);
685 assert!(sql.contains("UPDATE \"audit_outbox\""));
686 assert!(sql.contains("SET next_retry_at = (NOW() +"));
688 assert!(sql.contains("$1"));
689 assert!(sql.contains("WHERE event_id = ANY($2)"));
691 assert!(!sql.contains("$3"));
692 assert!(!sql.contains("$4"));
693 assert!(!sql.contains("WHERE event_id IN"));
694 }
695
696 #[test]
697 fn postgres_claim_sql_any_bind_count_is_independent_of_batch_size() {
698 for n in [1, 10, 1000] {
701 let sql = Dialect::Postgres.claim_sql("audit_outbox", n);
702 assert!(
703 sql.contains("ANY($2)"),
704 "n={n}: expected ANY($2), got: {sql}"
705 );
706 assert!(
707 !sql.contains("$3"),
708 "n={n}: unexpected $3 placeholder, got: {sql}"
709 );
710 }
711 }
712
713 #[test]
714 fn mysql_claim_sql_uses_question_marks() {
715 let sql = Dialect::MySql.claim_sql("audit_outbox", 2);
716 assert!(sql.contains("UPDATE `audit_outbox`"));
717 assert!(sql.contains("SET next_retry_at = (UTC_TIMESTAMP(6) + INTERVAL ? MICROSECOND)"));
718 assert!(sql.contains("WHERE event_id IN (?, ?)"));
719 }
720
721 #[test]
722 fn sqlite_claim_sql_uses_question_marks() {
723 let sql = Dialect::Sqlite.claim_sql("audit_outbox", 1);
724 assert!(sql.contains("UPDATE \"audit_outbox\""));
725 assert!(sql.contains("SET next_retry_at = strftime("));
726 assert!(sql.contains("'now', ?)"));
727 assert!(sql.contains("WHERE event_id IN (?)"));
728 }
729
730 #[test]
731 fn now_plus_interval_uses_db_clock_per_dialect() {
732 assert!(
735 Dialect::Postgres
736 .now_plus_interval_expr(1)
737 .contains("NOW()")
738 );
739 assert!(
740 Dialect::MySql
741 .now_plus_interval_expr(1)
742 .contains("UTC_TIMESTAMP(6)")
743 );
744 assert!(Dialect::Sqlite.now_plus_interval_expr(1).contains("'now'"));
745 }
746
747 #[test]
748 fn claim_sql_increments_attempts_for_every_dialect() {
749 for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
753 let sql = dialect.claim_sql("audit_outbox", 2);
754 assert!(
755 sql.contains("attempts = attempts + 1"),
756 "{dialect:?} claim_sql must increment attempts, got: {sql}"
757 );
758 }
759 }
760
761 #[test]
762 fn schema_ddl_rejects_overlength_table_name() {
763 let long_name = "a".repeat(64);
766 for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
767 let err = dialect.schema_ddl(&long_name).unwrap_err();
768 assert!(
769 matches!(err, OutboxError::Internal(_)),
770 "{dialect:?} must reject a 64-byte table name"
771 );
772 }
773 }
774
775 #[test]
776 fn dlq_schema_ddl_does_not_create_redundant_event_id_index() {
777 for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
780 let ddl = dialect.dead_letter_schema_ddl("audit_outbox").unwrap();
781 assert!(
782 !ddl.contains("dead_letter_event_id"),
783 "{dialect:?} DLQ DDL must not create a redundant event_id index, got:\n{ddl}"
784 );
785 }
786 }
787
788 #[test]
789 fn sql_generation_quotes_identifiers() {
790 for dialect in [Dialect::Postgres, Dialect::MySql, Dialect::Sqlite] {
795 let table = "user";
796 let quoted = dialect.quote_identifier(table);
797 assert!(
798 dialect.poll_sql(table).contains("ed),
799 "{dialect:?} poll_sql must quote the table name"
800 );
801 assert!(
802 dialect.mark_delivered_sql(table).contains("ed),
803 "{dialect:?} mark_delivered_sql must quote the table name"
804 );
805 assert!(
806 dialect.mark_failed_sql(table).contains("ed),
807 "{dialect:?} mark_failed_sql must quote the table name"
808 );
809 assert!(
810 dialect.insert_sql(table).contains("ed),
811 "{dialect:?} insert_sql must quote the table name"
812 );
813 assert!(
814 dialect.claim_sql(table, 1).contains("ed),
815 "{dialect:?} claim_sql must quote the table name"
816 );
817 assert!(
818 dialect.delete_from_main_sql(table).contains("ed),
819 "{dialect:?} delete_from_main_sql must quote the table name"
820 );
821 }
822 }
823}