use hexeract_outbox::OutboxError;
use crate::validate::validate_table_name;
const POSTGRES_SCHEMA_SQL: &str = r"
CREATE TABLE IF NOT EXISTS {{table}} (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
event_type VARCHAR(64) NOT NULL,
payload JSONB NOT NULL,
subject_id UUID NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT NULL,
next_retry_at TIMESTAMPTZ NULL,
delivered_at TIMESTAMPTZ NULL
);
CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
ON {{table}} (created_at)
WHERE delivered_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
ON {{table}} (subject_id, id)
WHERE subject_id IS NOT NULL;
";
const MYSQL_SCHEMA_SQL: &str = r"
CREATE TABLE IF NOT EXISTS {{table}} (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
event_id BINARY(16) NOT NULL UNIQUE,
event_type VARCHAR(64) NOT NULL,
payload JSON NOT NULL,
subject_id BINARY(16) NULL,
created_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
attempts INT NOT NULL DEFAULT 0,
last_error TEXT NULL,
next_retry_at DATETIME(6) NULL,
delivered_at DATETIME(6) NULL,
INDEX idx_{{table}}_pending (delivered_at, created_at),
INDEX idx_{{table}}_subject (subject_id, id)
);
";
const SQLITE_SCHEMA_SQL: &str = r"
CREATE TABLE IF NOT EXISTS {{table}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id BLOB NOT NULL UNIQUE,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
subject_id BLOB,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
next_retry_at TEXT,
delivered_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
ON {{table}} (created_at)
WHERE delivered_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
ON {{table}} (subject_id, id)
WHERE subject_id IS NOT NULL;
";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Dialect {
Postgres,
MySql,
Sqlite,
}
impl Dialect {
#[must_use]
pub fn supports_skip_locked(self) -> bool {
matches!(self, Self::Postgres | Self::MySql)
}
pub(crate) fn placeholder(self, index: usize) -> String {
match self {
Self::Postgres => format!("${index}"),
Self::MySql | Self::Sqlite => "?".to_owned(),
}
}
pub(crate) fn now_expr(self) -> &'static str {
match self {
Self::Postgres => "NOW()",
Self::MySql => "UTC_TIMESTAMP()",
Self::Sqlite => "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
}
}
pub(crate) fn poll_sql(self, table: &str) -> String {
let max_attempts = self.placeholder(1);
let limit = self.placeholder(2);
let now = self.now_expr();
let lock = if self.supports_skip_locked() {
" FOR UPDATE SKIP LOCKED"
} else {
""
};
format!(
"SELECT event_id, event_type, payload, subject_id, created_at, \
attempts, last_error, next_retry_at \
FROM {table} \
WHERE delivered_at IS NULL \
AND attempts < {max_attempts} \
AND (next_retry_at IS NULL OR next_retry_at <= {now}) \
ORDER BY id \
LIMIT {limit}{lock}"
)
}
pub(crate) fn mark_delivered_sql(self, table: &str) -> String {
let event_id = self.placeholder(1);
let now = self.now_expr();
format!("UPDATE {table} SET delivered_at = {now} WHERE event_id = {event_id}")
}
pub(crate) fn mark_failed_sql(self, table: &str) -> String {
let last_error = self.placeholder(1);
let next_retry_at = self.placeholder(2);
let event_id = self.placeholder(3);
format!(
"UPDATE {table} \
SET attempts = attempts + 1, last_error = {last_error}, next_retry_at = {next_retry_at} \
WHERE event_id = {event_id}"
)
}
pub(crate) fn insert_sql(self, table: &str) -> String {
let p1 = self.placeholder(1);
let p2 = self.placeholder(2);
let p3 = self.placeholder(3);
let p4 = self.placeholder(4);
format!(
"INSERT INTO {table} (event_id, event_type, payload, subject_id) \
VALUES ({p1}, {p2}, {p3}, {p4})"
)
}
pub fn schema_ddl(self, table: &str) -> Result<String, OutboxError> {
validate_table_name(table)?;
let template = match self {
Self::Postgres => POSTGRES_SCHEMA_SQL,
Self::MySql => MYSQL_SCHEMA_SQL,
Self::Sqlite => SQLITE_SCHEMA_SQL,
};
Ok(template.replace("{{table}}", table))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn skip_locked_support_matches_engine_capabilities() {
assert!(Dialect::Postgres.supports_skip_locked());
assert!(Dialect::MySql.supports_skip_locked());
assert!(!Dialect::Sqlite.supports_skip_locked());
}
#[test]
fn postgres_uses_positional_placeholders() {
assert_eq!(Dialect::Postgres.placeholder(1), "$1");
assert_eq!(Dialect::Postgres.placeholder(4), "$4");
}
#[test]
fn mysql_and_sqlite_use_question_mark_placeholders() {
assert_eq!(Dialect::MySql.placeholder(1), "?");
assert_eq!(Dialect::Sqlite.placeholder(3), "?");
}
#[test]
fn postgres_poll_sql_locks_rows_and_binds_positionally() {
let sql = Dialect::Postgres.poll_sql("audit_outbox");
assert!(sql.contains("FROM audit_outbox"));
assert!(sql.contains("$1"));
assert!(sql.contains("$2"));
assert!(sql.contains("ORDER BY id"));
assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
assert!(sql.contains("NOW()"));
}
#[test]
fn mysql_poll_sql_locks_rows_with_question_marks() {
let sql = Dialect::MySql.poll_sql("audit_outbox");
assert!(sql.contains("FROM audit_outbox"));
assert!(sql.contains('?'));
assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
}
#[test]
fn sqlite_poll_sql_omits_skip_locked() {
let sql = Dialect::Sqlite.poll_sql("audit_outbox");
assert!(sql.contains("FROM audit_outbox"));
assert!(sql.contains('?'));
assert!(!sql.contains("FOR UPDATE SKIP LOCKED"));
assert!(sql.contains("strftime"));
}
#[test]
fn postgres_mark_delivered_sets_timestamp_by_event_id() {
let sql = Dialect::Postgres.mark_delivered_sql("audit_outbox");
assert!(sql.contains("UPDATE audit_outbox"));
assert!(sql.contains("delivered_at"));
assert!(sql.contains("$1"));
}
#[test]
fn postgres_mark_failed_increments_attempts_with_three_binds() {
let sql = Dialect::Postgres.mark_failed_sql("audit_outbox");
assert!(sql.contains("attempts = attempts + 1"));
assert!(sql.contains("$1"));
assert!(sql.contains("$2"));
assert!(sql.contains("$3"));
}
#[test]
fn postgres_insert_sql_binds_four_columns() {
let sql = Dialect::Postgres.insert_sql("audit_outbox");
assert!(sql.contains("INSERT INTO audit_outbox"));
assert!(sql.contains("event_id, event_type, payload, subject_id"));
assert!(sql.contains("$1, $2, $3, $4"));
}
#[test]
fn sqlite_insert_sql_uses_question_marks() {
let sql = Dialect::Sqlite.insert_sql("audit_outbox");
assert!(sql.contains("INSERT INTO audit_outbox"));
assert!(sql.contains("?, ?, ?, ?"));
}
#[test]
fn postgres_schema_ddl_matches_current_canonical_schema() {
let ddl = Dialect::Postgres.schema_ddl("audit_outbox").unwrap();
assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
assert!(ddl.contains("BIGSERIAL"));
assert!(ddl.contains("UUID"));
assert!(ddl.contains("JSONB"));
assert!(ddl.contains("TIMESTAMPTZ"));
assert!(ddl.contains("idx_audit_outbox_pending"));
assert!(ddl.contains("idx_audit_outbox_subject"));
assert!(!ddl.contains("{{table}}"));
}
#[test]
fn mysql_schema_ddl_uses_native_types() {
let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
assert!(ddl.contains("AUTO_INCREMENT"));
assert!(ddl.contains("BINARY(16)"));
assert!(ddl.contains("JSON"));
assert!(!ddl.contains("{{table}}"));
}
#[test]
fn sqlite_schema_ddl_uses_portable_text_types() {
let ddl = Dialect::Sqlite.schema_ddl("audit_outbox").unwrap();
assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
assert!(ddl.contains("AUTOINCREMENT"));
assert!(ddl.contains("BLOB"));
assert!(ddl.contains("strftime"));
assert!(!ddl.contains("{{table}}"));
}
#[test]
fn schema_ddl_rejects_invalid_table_name() {
let err = Dialect::Postgres.schema_ddl("bad name; DROP").unwrap_err();
assert!(matches!(err, OutboxError::Internal(_)));
}
#[test]
fn mysql_compares_against_utc_not_session_time() {
let sql = Dialect::MySql.poll_sql("audit_outbox");
assert!(sql.contains("UTC_TIMESTAMP()"));
}
#[test]
fn mysql_schema_ddl_defaults_created_at_to_utc() {
let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
assert!(ddl.contains("UTC_TIMESTAMP(6)"));
}
}