1use hexeract_outbox::OutboxError;
2
3use crate::validate::validate_table_name;
4
5const POSTGRES_SCHEMA_SQL: &str = r"
11CREATE TABLE IF NOT EXISTS {{table}} (
12 id BIGSERIAL PRIMARY KEY,
13 event_id UUID NOT NULL UNIQUE,
14 event_type VARCHAR(64) NOT NULL,
15 payload JSONB NOT NULL,
16 subject_id UUID NULL,
17 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
18 attempts INTEGER NOT NULL DEFAULT 0,
19 last_error TEXT NULL,
20 next_retry_at TIMESTAMPTZ NULL,
21 delivered_at TIMESTAMPTZ NULL
22);
23CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
24 ON {{table}} (created_at)
25 WHERE delivered_at IS NULL;
26CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
27 ON {{table}} (subject_id, id)
28 WHERE subject_id IS NOT NULL;
29";
30
31const MYSQL_SCHEMA_SQL: &str = r"
39CREATE TABLE IF NOT EXISTS {{table}} (
40 id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
41 event_id BINARY(16) NOT NULL UNIQUE,
42 event_type VARCHAR(64) NOT NULL,
43 payload JSON NOT NULL,
44 subject_id BINARY(16) NULL,
45 created_at DATETIME(6) NOT NULL DEFAULT (UTC_TIMESTAMP(6)),
46 attempts INT NOT NULL DEFAULT 0,
47 last_error TEXT NULL,
48 next_retry_at DATETIME(6) NULL,
49 delivered_at DATETIME(6) NULL,
50 INDEX idx_{{table}}_pending (delivered_at, created_at),
51 INDEX idx_{{table}}_subject (subject_id, id)
52);
53";
54
55const SQLITE_SCHEMA_SQL: &str = r"
61CREATE TABLE IF NOT EXISTS {{table}} (
62 id INTEGER PRIMARY KEY AUTOINCREMENT,
63 event_id BLOB NOT NULL UNIQUE,
64 event_type TEXT NOT NULL,
65 payload TEXT NOT NULL,
66 subject_id BLOB,
67 created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
68 attempts INTEGER NOT NULL DEFAULT 0,
69 last_error TEXT,
70 next_retry_at TEXT,
71 delivered_at TEXT
72);
73CREATE INDEX IF NOT EXISTS idx_{{table}}_pending
74 ON {{table}} (created_at)
75 WHERE delivered_at IS NULL;
76CREATE INDEX IF NOT EXISTS idx_{{table}}_subject
77 ON {{table}} (subject_id, id)
78 WHERE subject_id IS NOT NULL;
79";
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum Dialect {
89 Postgres,
91 MySql,
93 Sqlite,
95}
96
97impl Dialect {
98 #[must_use]
103 pub fn supports_skip_locked(self) -> bool {
104 matches!(self, Self::Postgres | Self::MySql)
105 }
106
107 pub(crate) fn placeholder(self, index: usize) -> String {
111 match self {
112 Self::Postgres => format!("${index}"),
113 Self::MySql | Self::Sqlite => "?".to_owned(),
114 }
115 }
116
117 pub(crate) fn now_expr(self) -> &'static str {
120 match self {
121 Self::Postgres => "NOW()",
122 Self::MySql => "UTC_TIMESTAMP()",
126 Self::Sqlite => "strftime('%Y-%m-%dT%H:%M:%fZ', 'now')",
127 }
128 }
129
130 pub(crate) fn poll_sql(self, table: &str) -> String {
132 let max_attempts = self.placeholder(1);
133 let limit = self.placeholder(2);
134 let now = self.now_expr();
135 let lock = if self.supports_skip_locked() {
136 " FOR UPDATE SKIP LOCKED"
137 } else {
138 ""
139 };
140 format!(
141 "SELECT event_id, event_type, payload, subject_id, created_at, \
142 attempts, last_error, next_retry_at \
143 FROM {table} \
144 WHERE delivered_at IS NULL \
145 AND attempts < {max_attempts} \
146 AND (next_retry_at IS NULL OR next_retry_at <= {now}) \
147 ORDER BY id \
148 LIMIT {limit}{lock}"
149 )
150 }
151
152 pub(crate) fn mark_delivered_sql(self, table: &str) -> String {
154 let event_id = self.placeholder(1);
155 let now = self.now_expr();
156 format!("UPDATE {table} SET delivered_at = {now} WHERE event_id = {event_id}")
157 }
158
159 pub(crate) fn mark_failed_sql(self, table: &str) -> String {
161 let last_error = self.placeholder(1);
162 let next_retry_at = self.placeholder(2);
163 let event_id = self.placeholder(3);
164 format!(
165 "UPDATE {table} \
166 SET attempts = attempts + 1, last_error = {last_error}, next_retry_at = {next_retry_at} \
167 WHERE event_id = {event_id}"
168 )
169 }
170
171 pub(crate) fn insert_sql(self, table: &str) -> String {
173 let p1 = self.placeholder(1);
174 let p2 = self.placeholder(2);
175 let p3 = self.placeholder(3);
176 let p4 = self.placeholder(4);
177 format!(
178 "INSERT INTO {table} (event_id, event_type, payload, subject_id) \
179 VALUES ({p1}, {p2}, {p3}, {p4})"
180 )
181 }
182
183 pub fn schema_ddl(self, table: &str) -> Result<String, OutboxError> {
190 validate_table_name(table)?;
191 let template = match self {
192 Self::Postgres => POSTGRES_SCHEMA_SQL,
193 Self::MySql => MYSQL_SCHEMA_SQL,
194 Self::Sqlite => SQLITE_SCHEMA_SQL,
195 };
196 Ok(template.replace("{{table}}", table))
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn skip_locked_support_matches_engine_capabilities() {
206 assert!(Dialect::Postgres.supports_skip_locked());
207 assert!(Dialect::MySql.supports_skip_locked());
208 assert!(!Dialect::Sqlite.supports_skip_locked());
209 }
210
211 #[test]
212 fn postgres_uses_positional_placeholders() {
213 assert_eq!(Dialect::Postgres.placeholder(1), "$1");
214 assert_eq!(Dialect::Postgres.placeholder(4), "$4");
215 }
216
217 #[test]
218 fn mysql_and_sqlite_use_question_mark_placeholders() {
219 assert_eq!(Dialect::MySql.placeholder(1), "?");
220 assert_eq!(Dialect::Sqlite.placeholder(3), "?");
221 }
222
223 #[test]
224 fn postgres_poll_sql_locks_rows_and_binds_positionally() {
225 let sql = Dialect::Postgres.poll_sql("audit_outbox");
226 assert!(sql.contains("FROM audit_outbox"));
227 assert!(sql.contains("$1"));
228 assert!(sql.contains("$2"));
229 assert!(sql.contains("ORDER BY id"));
230 assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
231 assert!(sql.contains("NOW()"));
232 }
233
234 #[test]
235 fn mysql_poll_sql_locks_rows_with_question_marks() {
236 let sql = Dialect::MySql.poll_sql("audit_outbox");
237 assert!(sql.contains("FROM audit_outbox"));
238 assert!(sql.contains('?'));
239 assert!(sql.contains("FOR UPDATE SKIP LOCKED"));
240 }
241
242 #[test]
243 fn sqlite_poll_sql_omits_skip_locked() {
244 let sql = Dialect::Sqlite.poll_sql("audit_outbox");
245 assert!(sql.contains("FROM audit_outbox"));
246 assert!(sql.contains('?'));
247 assert!(!sql.contains("FOR UPDATE SKIP LOCKED"));
248 assert!(sql.contains("strftime"));
249 }
250
251 #[test]
252 fn postgres_mark_delivered_sets_timestamp_by_event_id() {
253 let sql = Dialect::Postgres.mark_delivered_sql("audit_outbox");
254 assert!(sql.contains("UPDATE audit_outbox"));
255 assert!(sql.contains("delivered_at"));
256 assert!(sql.contains("$1"));
257 }
258
259 #[test]
260 fn postgres_mark_failed_increments_attempts_with_three_binds() {
261 let sql = Dialect::Postgres.mark_failed_sql("audit_outbox");
262 assert!(sql.contains("attempts = attempts + 1"));
263 assert!(sql.contains("$1"));
264 assert!(sql.contains("$2"));
265 assert!(sql.contains("$3"));
266 }
267
268 #[test]
269 fn postgres_insert_sql_binds_four_columns() {
270 let sql = Dialect::Postgres.insert_sql("audit_outbox");
271 assert!(sql.contains("INSERT INTO audit_outbox"));
272 assert!(sql.contains("event_id, event_type, payload, subject_id"));
273 assert!(sql.contains("$1, $2, $3, $4"));
274 }
275
276 #[test]
277 fn sqlite_insert_sql_uses_question_marks() {
278 let sql = Dialect::Sqlite.insert_sql("audit_outbox");
279 assert!(sql.contains("INSERT INTO audit_outbox"));
280 assert!(sql.contains("?, ?, ?, ?"));
281 }
282
283 #[test]
284 fn postgres_schema_ddl_matches_current_canonical_schema() {
285 let ddl = Dialect::Postgres.schema_ddl("audit_outbox").unwrap();
286 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
287 assert!(ddl.contains("BIGSERIAL"));
288 assert!(ddl.contains("UUID"));
289 assert!(ddl.contains("JSONB"));
290 assert!(ddl.contains("TIMESTAMPTZ"));
291 assert!(ddl.contains("idx_audit_outbox_pending"));
292 assert!(ddl.contains("idx_audit_outbox_subject"));
293 assert!(!ddl.contains("{{table}}"));
294 }
295
296 #[test]
297 fn mysql_schema_ddl_uses_native_types() {
298 let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
299 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
300 assert!(ddl.contains("AUTO_INCREMENT"));
301 assert!(ddl.contains("BINARY(16)"));
302 assert!(ddl.contains("JSON"));
303 assert!(!ddl.contains("{{table}}"));
304 }
305
306 #[test]
307 fn sqlite_schema_ddl_uses_portable_text_types() {
308 let ddl = Dialect::Sqlite.schema_ddl("audit_outbox").unwrap();
309 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS audit_outbox"));
310 assert!(ddl.contains("AUTOINCREMENT"));
311 assert!(ddl.contains("BLOB"));
312 assert!(ddl.contains("strftime"));
313 assert!(!ddl.contains("{{table}}"));
314 }
315
316 #[test]
317 fn schema_ddl_rejects_invalid_table_name() {
318 let err = Dialect::Postgres.schema_ddl("bad name; DROP").unwrap_err();
319 assert!(matches!(err, OutboxError::Internal(_)));
320 }
321
322 #[test]
323 fn mysql_compares_against_utc_not_session_time() {
324 let sql = Dialect::MySql.poll_sql("audit_outbox");
325 assert!(sql.contains("UTC_TIMESTAMP()"));
326 }
327
328 #[test]
329 fn mysql_schema_ddl_defaults_created_at_to_utc() {
330 let ddl = Dialect::MySql.schema_ddl("audit_outbox").unwrap();
331 assert!(ddl.contains("UTC_TIMESTAMP(6)"));
332 }
333}