1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use hexeract_outbox::ErasedHandler;
7use hexeract_outbox::Event;
8use hexeract_outbox::Handler;
9use hexeract_outbox::OutboxEnvelope;
10use hexeract_outbox::OutboxError;
11use hexeract_outbox::OutboxPublisher;
12use hexeract_outbox::OutboxStore;
13use hexeract_outbox::OutboxWorker;
14use hexeract_outbox::OutboxWorkerConfig;
15use hexeract_outbox::TypedHandler;
16use sqlx::Acquire;
17use sqlx::PgPool;
18use sqlx::Postgres;
19use sqlx::Row;
20use sqlx::Transaction;
21use sqlx::pool::PoolConnection;
22use time::OffsetDateTime;
23use uuid::Uuid;
24
25use crate::DEFAULT_TABLE_NAME;
26use crate::dialect::Dialect;
27use crate::envelope::assemble_envelope;
28use crate::envelope::to_system_time;
29use crate::validate::validate_event_type;
30use crate::validate::validate_table_name;
31
32const DIALECT: Dialect = Dialect::Postgres;
33
34const MAX_PG_INTERVAL_SECS: f64 = 9_223_372_036.0; fn duration_to_pg_secs(d: Duration) -> f64 {
50 d.as_secs_f64().min(MAX_PG_INTERVAL_SECS)
51}
52
53fn database_error(error: impl std::error::Error + Send + Sync + 'static) -> OutboxError {
54 OutboxError::Database(Box::new(error))
55}
56
57fn pool_error(error: sqlx::Error) -> OutboxError {
58 if matches!(error, sqlx::Error::PoolTimedOut) {
59 OutboxError::PoolTimeout
60 } else {
61 OutboxError::Database(Box::new(error))
62 }
63}
64
65fn decode_pg_row(row: &sqlx::postgres::PgRow) -> Result<OutboxEnvelope, OutboxError> {
70 let event_id: Uuid = row.try_get("event_id").map_err(database_error)?;
71 let event_type: String = row.try_get("event_type").map_err(database_error)?;
72 let payload: serde_json::Value = row.try_get("payload").map_err(database_error)?;
73 let subject_id: Option<Uuid> = row.try_get("subject_id").map_err(database_error)?;
74 let created_at: OffsetDateTime = row.try_get("created_at").map_err(database_error)?;
75 let attempts: i32 = row.try_get("attempts").map_err(database_error)?;
76 let last_error: Option<String> = row.try_get("last_error").map_err(database_error)?;
77 let next_retry_at: Option<OffsetDateTime> =
78 row.try_get("next_retry_at").map_err(database_error)?;
79
80 let payload = serde_json::to_vec(&payload)?;
81
82 Ok(assemble_envelope(
83 event_id,
84 event_type,
85 payload,
86 subject_id,
87 to_system_time(created_at),
88 u32::try_from(attempts.max(0)).unwrap_or(u32::MAX),
89 last_error,
90 next_retry_at.map(to_system_time),
91 ))
92}
93
94#[derive(Debug, Clone)]
95struct DeadLetterSql {
96 insert_sql: Arc<str>,
97 delete_sql: Arc<str>,
98}
99
100pub async fn ensure_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
113 let ddl = DIALECT.schema_ddl(table_name)?;
114 sqlx::raw_sql(&ddl)
115 .execute(pool)
116 .await
117 .map_err(database_error)?;
118 Ok(())
119}
120
121#[derive(Debug, Clone)]
125pub struct PgOutboxStore {
126 pool: PgPool,
127 table_name: Arc<str>,
128 poll_sql: Arc<str>,
129 mark_delivered_sql: Arc<str>,
130 mark_failed_sql: Arc<str>,
131 dead_letter: Option<Arc<DeadLetterSql>>,
132}
133
134impl PgOutboxStore {
135 pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
145 let table_name = table_name.into();
146 validate_table_name(&table_name)?;
147 let poll_sql = DIALECT.poll_sql(&table_name);
148 let mark_delivered_sql = DIALECT.mark_delivered_sql(&table_name);
149 let mark_failed_sql = DIALECT.mark_failed_sql(&table_name);
150 Ok(Self {
151 pool,
152 table_name: Arc::from(table_name),
153 poll_sql: Arc::from(poll_sql),
154 mark_delivered_sql: Arc::from(mark_delivered_sql),
155 mark_failed_sql: Arc::from(mark_failed_sql),
156 dead_letter: None,
157 })
158 }
159
160 #[must_use]
162 pub fn pool(&self) -> &PgPool {
163 &self.pool
164 }
165
166 #[must_use]
168 pub fn table_name(&self) -> &str {
169 &self.table_name
170 }
171
172 pub fn with_dead_letter(mut self, dlq_table: impl Into<String>) -> Result<Self, OutboxError> {
182 let dlq = dlq_table.into();
183 validate_table_name(&dlq)?;
184 let insert_sql = DIALECT.insert_dead_letter_sql(&self.table_name, &dlq);
185 let delete_sql = DIALECT.delete_from_main_sql(&self.table_name);
186 self.dead_letter = Some(Arc::new(DeadLetterSql {
187 insert_sql: Arc::from(insert_sql),
188 delete_sql: Arc::from(delete_sql),
189 }));
190 Ok(self)
191 }
192}
193
194#[async_trait]
195impl OutboxStore for PgOutboxStore {
196 type Client = PoolConnection<Postgres>;
197 type Tx<'tx> = Transaction<'tx, Postgres>;
198
199 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
200 self.pool.acquire().await.map_err(pool_error)
201 }
202
203 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError> {
204 client.begin().await.map_err(database_error)
205 }
206
207 async fn poll<'a>(
208 &self,
209 tx: &mut Self::Tx<'a>,
210 batch_size: usize,
211 max_attempts: u32,
212 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
213 let limit = i64::try_from(batch_size).unwrap_or(i64::MAX);
214 let max = i32::try_from(max_attempts).unwrap_or(i32::MAX);
215 let rows = sqlx::query(&self.poll_sql)
216 .bind(max)
217 .bind(limit)
218 .fetch_all(&mut **tx)
219 .await
220 .map_err(database_error)?;
221
222 let mut envelopes = Vec::with_capacity(rows.len());
223 for row in rows {
224 match decode_pg_row(&row) {
228 Ok(envelope) => envelopes.push(envelope),
229 Err(error) => {
230 let event_id = row.try_get::<Uuid, _>("event_id").ok();
231 tracing::error!(
232 ?event_id,
233 error = %error,
234 "skipping undecodable outbox row; the rest of the batch continues"
235 );
236 }
237 }
238 }
239 Ok(envelopes)
240 }
241
242 async fn mark_delivered<'a>(
243 &self,
244 tx: &mut Self::Tx<'a>,
245 event_id: Uuid,
246 ) -> Result<(), OutboxError> {
247 sqlx::query(&self.mark_delivered_sql)
248 .bind(event_id)
249 .execute(&mut **tx)
250 .await
251 .map_err(database_error)?;
252 Ok(())
253 }
254
255 async fn mark_failed<'a>(
256 &self,
257 tx: &mut Self::Tx<'a>,
258 event_id: Uuid,
259 error: &str,
260 retry_in: Duration,
261 ) -> Result<(), OutboxError> {
262 sqlx::query(&self.mark_failed_sql)
266 .bind(error)
267 .bind(duration_to_pg_secs(retry_in))
268 .bind(event_id)
269 .execute(&mut **tx)
270 .await
271 .map_err(database_error)?;
272 Ok(())
273 }
274
275 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError> {
276 tx.commit().await.map_err(database_error)
277 }
278
279 async fn mark_dead_lettered<'a>(
280 &self,
281 tx: &mut Self::Tx<'a>,
282 event_id: Uuid,
283 _error: &str,
284 ) -> Result<(), OutboxError> {
285 let Some(dlq) = &self.dead_letter else {
286 return Ok(());
287 };
288 sqlx::query(&dlq.insert_sql)
289 .bind(event_id)
290 .execute(&mut **tx)
291 .await
292 .map_err(database_error)?;
293 sqlx::query(&dlq.delete_sql)
294 .bind(event_id)
295 .execute(&mut **tx)
296 .await
297 .map_err(database_error)?;
298 Ok(())
299 }
300
301 async fn claim<'a>(
302 &self,
303 tx: &mut Self::Tx<'a>,
304 event_ids: &[Uuid],
305 lease_for: Duration,
306 ) -> Result<(), OutboxError> {
307 if event_ids.is_empty() {
308 return Ok(());
309 }
310 let sql = DIALECT.claim_sql(&self.table_name, event_ids.len());
316 sqlx::query(&sql)
320 .bind(duration_to_pg_secs(lease_for))
321 .bind(event_ids)
322 .execute(&mut **tx)
323 .await
324 .map_err(database_error)?;
325 Ok(())
326 }
327}
328
329#[derive(Debug, Clone)]
333pub struct PgOutboxPublisher {
334 pool: PgPool,
335 table_name: Arc<str>,
336 insert_sql: Arc<str>,
337}
338
339impl PgOutboxPublisher {
340 pub fn new(pool: PgPool, table_name: impl Into<String>) -> Result<Self, OutboxError> {
347 let table_name = table_name.into();
348 validate_table_name(&table_name)?;
349 let insert_sql = DIALECT.insert_sql(&table_name);
350 Ok(Self {
351 pool,
352 table_name: Arc::from(table_name),
353 insert_sql: Arc::from(insert_sql),
354 })
355 }
356
357 #[must_use]
359 pub fn pool(&self) -> &PgPool {
360 &self.pool
361 }
362
363 #[must_use]
365 pub fn table_name(&self) -> &str {
366 &self.table_name
367 }
368}
369
370impl OutboxPublisher for PgOutboxPublisher {
371 type Tx<'tx> = Transaction<'tx, Postgres>;
372
373 async fn publish_in_tx<E: Event>(
374 &self,
375 tx: &mut Self::Tx<'_>,
376 event: &E,
377 ) -> Result<Uuid, OutboxError> {
378 validate_event_type(E::EVENT_TYPE)?;
382 let event_id = Uuid::now_v7();
383 let payload = serde_json::to_value(event)?;
384 sqlx::query(&self.insert_sql)
385 .bind(event_id)
386 .bind(E::EVENT_TYPE)
387 .bind(payload)
388 .bind(Option::<Uuid>::None)
389 .execute(&mut **tx)
390 .await
391 .map_err(database_error)?;
392 Ok(event_id)
393 }
394
395 async fn publish_in_tx_with_subject<E: Event>(
396 &self,
397 tx: &mut Self::Tx<'_>,
398 subject_id: Uuid,
399 event: &E,
400 ) -> Result<Uuid, OutboxError> {
401 validate_event_type(E::EVENT_TYPE)?;
402 let event_id = Uuid::now_v7();
403 let payload = serde_json::to_value(event)?;
404 sqlx::query(&self.insert_sql)
405 .bind(event_id)
406 .bind(E::EVENT_TYPE)
407 .bind(payload)
408 .bind(Some(subject_id))
409 .execute(&mut **tx)
410 .await
411 .map_err(database_error)?;
412 Ok(event_id)
413 }
414
415 async fn publish<E: Event>(&self, event: &E) -> Result<Uuid, OutboxError> {
416 let mut tx = self.pool.begin().await.map_err(database_error)?;
417 let event_id = self.publish_in_tx(&mut tx, event).await?;
418 tx.commit().await.map_err(database_error)?;
419 Ok(event_id)
420 }
421}
422
423pub struct PgOutboxWorkerBuilder {
455 pool: PgPool,
456 table_name: String,
457 dead_letter_table: Option<String>,
458 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
459 config: OutboxWorkerConfig,
460}
461
462impl PgOutboxWorkerBuilder {
463 #[must_use]
465 pub fn new(pool: PgPool) -> Self {
466 Self {
467 pool,
468 table_name: DEFAULT_TABLE_NAME.to_owned(),
469 dead_letter_table: None,
470 handlers: HashMap::new(),
471 config: OutboxWorkerConfig::default(),
472 }
473 }
474
475 #[must_use]
477 pub fn table_name(mut self, name: impl Into<String>) -> Self {
478 self.table_name = name.into();
479 self
480 }
481
482 #[must_use]
488 pub fn dead_letter_table(mut self, name: impl Into<String>) -> Self {
489 self.dead_letter_table = Some(name.into());
490 self
491 }
492
493 #[must_use]
498 pub fn register_handler<E, H>(mut self, handler: H) -> Self
499 where
500 E: Event,
501 H: Handler<E>,
502 {
503 let typed = TypedHandler::<E, H>::new(handler);
504 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
505 self.handlers.insert(E::EVENT_TYPE, erased);
506 self
507 }
508
509 #[must_use]
511 pub fn shared_handler<E, H>(mut self, handler: Arc<H>) -> Self
512 where
513 E: Event,
514 H: Handler<E>,
515 {
516 let typed = TypedHandler::<E, H>::shared(handler);
517 let erased: Arc<dyn ErasedHandler> = Arc::new(typed);
518 self.handlers.insert(E::EVENT_TYPE, erased);
519 self
520 }
521
522 #[must_use]
524 pub fn poll_interval(mut self, d: Duration) -> Self {
525 self.config.poll_interval = d;
526 self
527 }
528
529 #[must_use]
531 pub fn batch_size(mut self, n: usize) -> Self {
532 self.config.batch_size = n;
533 self
534 }
535
536 #[must_use]
538 pub fn max_attempts(mut self, n: u32) -> Self {
539 self.config.max_attempts = n;
540 self
541 }
542
543 #[must_use]
548 pub fn retry_base_delay(mut self, d: Duration) -> Self {
549 self.config.retry_base_delay = d;
550 self
551 }
552
553 #[must_use]
557 pub fn retry_max_delay(mut self, d: Duration) -> Self {
558 self.config.retry_max_delay = d;
559 self
560 }
561
562 #[must_use]
567 pub fn jitter(mut self, enabled: bool) -> Self {
568 self.config.jitter = enabled;
569 self
570 }
571
572 #[must_use]
579 pub fn dispatch_timeout(mut self, d: Duration) -> Self {
580 self.config.dispatch_timeout = d;
581 self
582 }
583
584 pub fn build(self) -> Result<OutboxWorker<PgOutboxStore>, OutboxError> {
591 let mut store = PgOutboxStore::new(self.pool, self.table_name)?;
592 if let Some(dlq) = self.dead_letter_table {
593 store = store.with_dead_letter(dlq)?;
594 }
595 Ok(OutboxWorker::new(store, self.handlers, self.config))
596 }
597}
598
599pub async fn ensure_dead_letter_schema(pool: &PgPool, table_name: &str) -> Result<(), OutboxError> {
608 let ddl = DIALECT.dead_letter_schema_ddl(table_name)?;
609 sqlx::raw_sql(&ddl)
610 .execute(pool)
611 .await
612 .map_err(database_error)?;
613 Ok(())
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use hexeract_core::HandlerContext;
620 use serde::Deserialize;
621 use serde::Serialize;
622
623 fn lazy_pool() -> PgPool {
624 PgPool::connect_lazy("postgres://nobody:nobody@127.0.0.1:1/nobody")
625 .expect("lazy pool must build from a valid URL")
626 }
627
628 #[derive(Debug, Serialize, Deserialize)]
629 struct UserRegistered {
630 user_id: Uuid,
631 }
632
633 impl Event for UserRegistered {
634 const EVENT_TYPE: &'static str = "users.registered";
635 }
636
637 #[derive(Debug, Serialize, Deserialize)]
638 struct OrderPlaced {
639 order_id: Uuid,
640 }
641
642 impl Event for OrderPlaced {
643 const EVENT_TYPE: &'static str = "orders.placed";
644 }
645
646 struct NoopHandler;
647
648 impl Handler<UserRegistered> for NoopHandler {
649 type Error = OutboxError;
650 async fn handle(
651 &self,
652 _event: UserRegistered,
653 _ctx: &HandlerContext,
654 ) -> Result<(), Self::Error> {
655 Ok(())
656 }
657 }
658
659 impl Handler<OrderPlaced> for NoopHandler {
660 type Error = OutboxError;
661 async fn handle(
662 &self,
663 _event: OrderPlaced,
664 _ctx: &HandlerContext,
665 ) -> Result<(), Self::Error> {
666 Ok(())
667 }
668 }
669
670 #[test]
671 fn pool_error_maps_pool_timed_out_to_pool_timeout_variant() {
672 let err = pool_error(sqlx::Error::PoolTimedOut);
673 assert!(
674 matches!(err, OutboxError::PoolTimeout),
675 "PoolTimedOut must map to OutboxError::PoolTimeout, got {err:?}"
676 );
677 }
678
679 #[test]
680 fn pool_error_wraps_other_errors_as_database_error() {
681 let err = pool_error(sqlx::Error::RowNotFound);
682 assert!(
683 matches!(err, OutboxError::Database(_)),
684 "non-timeout errors must map to OutboxError::Database, got {err:?}"
685 );
686 }
687
688 #[tokio::test]
689 async fn store_new_rejects_invalid_table_name() {
690 let err = PgOutboxStore::new(lazy_pool(), "bad name; DROP").unwrap_err();
691 assert!(matches!(err, OutboxError::Internal(_)));
692 }
693
694 #[tokio::test]
695 async fn store_new_caches_postgres_sql_with_validated_table_name() {
696 let store = PgOutboxStore::new(lazy_pool(), "audit_outbox").unwrap();
697 assert_eq!(store.table_name(), "audit_outbox");
698 assert!(store.poll_sql.contains("FROM \"audit_outbox\""));
699 assert!(store.poll_sql.contains("FOR UPDATE SKIP LOCKED"));
700 assert!(store.mark_delivered_sql.contains("UPDATE \"audit_outbox\""));
701 assert!(!store.mark_failed_sql.contains("attempts = attempts + 1"));
704 }
705
706 #[tokio::test]
707 async fn publisher_new_rejects_invalid_table_name() {
708 let err = PgOutboxPublisher::new(lazy_pool(), "bad name; DROP").unwrap_err();
709 assert!(matches!(err, OutboxError::Internal(_)));
710 }
711
712 #[tokio::test]
713 async fn publisher_new_caches_insert_sql_with_validated_table_name() {
714 let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
715 assert_eq!(publisher.table_name(), "audit_outbox");
716 assert!(
717 publisher
718 .insert_sql
719 .contains("INSERT INTO \"audit_outbox\"")
720 );
721 assert!(publisher.insert_sql.contains("$1, $2, $3, $4"));
722 }
723
724 #[tokio::test]
725 async fn builder_starts_with_default_table_and_empty_handlers() {
726 let builder = PgOutboxWorkerBuilder::new(lazy_pool());
727 assert_eq!(builder.table_name, DEFAULT_TABLE_NAME);
728 assert!(builder.handlers.is_empty());
729 let default_cfg = OutboxWorkerConfig::default();
730 assert_eq!(builder.config.batch_size, default_cfg.batch_size);
731 assert_eq!(builder.config.max_attempts, default_cfg.max_attempts);
732 }
733
734 #[tokio::test]
735 async fn builder_table_name_can_be_customized() {
736 let builder = PgOutboxWorkerBuilder::new(lazy_pool()).table_name("my_outbox");
737 assert_eq!(builder.table_name, "my_outbox");
738 }
739
740 #[tokio::test]
741 async fn builder_register_handler_records_event_types() {
742 let builder = PgOutboxWorkerBuilder::new(lazy_pool())
743 .register_handler::<UserRegistered, _>(NoopHandler)
744 .register_handler::<OrderPlaced, _>(NoopHandler);
745 assert_eq!(builder.handlers.len(), 2);
746 assert!(builder.handlers.contains_key("users.registered"));
747 assert!(builder.handlers.contains_key("orders.placed"));
748 }
749
750 #[tokio::test]
751 async fn builder_register_handler_twice_replaces_silently() {
752 let builder = PgOutboxWorkerBuilder::new(lazy_pool())
753 .register_handler::<UserRegistered, _>(NoopHandler)
754 .register_handler::<UserRegistered, _>(NoopHandler);
755 assert_eq!(builder.handlers.len(), 1);
756 }
757
758 #[tokio::test]
759 async fn builder_build_rejects_invalid_table_name() {
760 let result = PgOutboxWorkerBuilder::new(lazy_pool())
761 .table_name("bad name; DROP TABLE")
762 .build();
763 assert!(matches!(result, Err(OutboxError::Internal(_))));
764 }
765
766 #[tokio::test]
767 async fn builder_build_with_default_table_name_succeeds() {
768 let worker = PgOutboxWorkerBuilder::new(lazy_pool()).build();
769 assert!(worker.is_ok());
770 }
771
772 #[tokio::test]
773 async fn store_with_dead_letter_caches_sql_for_dlq() {
774 let store = PgOutboxStore::new(lazy_pool(), "audit_outbox")
775 .unwrap()
776 .with_dead_letter("audit_outbox_dead_letter")
777 .unwrap();
778 let dlq = store.dead_letter.as_ref().unwrap();
779 assert!(
780 dlq.insert_sql
781 .contains("INSERT INTO \"audit_outbox_dead_letter\"")
782 );
783 assert!(dlq.insert_sql.contains("FROM \"audit_outbox\""));
784 assert!(dlq.insert_sql.contains("$1"));
785 assert!(dlq.delete_sql.contains("DELETE FROM \"audit_outbox\""));
786 assert!(dlq.delete_sql.contains("$1"));
787 }
788
789 #[tokio::test]
790 async fn store_with_dead_letter_rejects_invalid_dlq_name() {
791 let err = PgOutboxStore::new(lazy_pool(), "audit_outbox")
792 .unwrap()
793 .with_dead_letter("bad name; DROP")
794 .unwrap_err();
795 assert!(matches!(err, OutboxError::Internal(_)));
796 }
797
798 #[tokio::test]
799 async fn builder_dead_letter_table_propagates_to_store() {
800 let worker = PgOutboxWorkerBuilder::new(lazy_pool())
801 .dead_letter_table("audit_outbox_dead_letter")
802 .build()
803 .unwrap();
804 drop(worker);
805 }
806
807 #[tokio::test]
808 async fn builder_dispatch_timeout_overrides_default() {
809 let worker = PgOutboxWorkerBuilder::new(lazy_pool())
810 .dispatch_timeout(Duration::from_secs(60))
811 .build()
812 .unwrap();
813 drop(worker);
814 }
815
816 #[test]
817 fn store_claim_sql_uses_any_array_bind() {
818 let sql = DIALECT.claim_sql("audit_outbox", 3);
821 assert!(sql.contains("UPDATE \"audit_outbox\""));
822 assert!(sql.contains("next_retry_at = (NOW() +"));
823 assert!(sql.contains("$1"));
824 assert!(sql.contains("WHERE event_id = ANY($2)"));
825 assert!(!sql.contains("$3"));
826 assert!(sql.contains("attempts = attempts + 1"));
827 }
828
829 #[test]
830 fn duration_to_pg_secs_caps_at_max_interval() {
831 let capped = duration_to_pg_secs(Duration::MAX);
834 assert!(
835 capped.is_finite(),
836 "Duration::MAX must produce a finite f64, got {capped}"
837 );
838 assert!(
840 (capped - MAX_PG_INTERVAL_SECS).abs() < 1.0,
841 "capped value must equal MAX_PG_INTERVAL_SECS, got {capped}"
842 );
843
844 let ordinary = Duration::from_secs(300);
846 assert!(
847 (duration_to_pg_secs(ordinary) - 300.0_f64).abs() < f64::EPSILON,
848 "ordinary duration must not be capped"
849 );
850 }
851
852 #[tokio::test]
853 async fn publisher_rejects_event_type_exceeding_64_bytes() {
854 const OVERLENGTH_EVENT_TYPE: &str =
858 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1";
859
860 let publisher = PgOutboxPublisher::new(lazy_pool(), "audit_outbox").unwrap();
861 let result = validate_event_type(OVERLENGTH_EVENT_TYPE);
865 assert!(
866 matches!(result, Err(OutboxError::Internal(_))),
867 "overlength EVENT_TYPE must be rejected before the DB insert"
868 );
869 drop(publisher);
870 }
871}