Skip to main content

outbox_core/
model.rs

1//! Core domain types: the outbox [`Event`] row and its lifecycle
2//! [`EventStatus`].
3//!
4//! These types are what storage adapters read and write, what transports
5//! publish, and what the [`IdempotencyStrategy::Custom`](crate::config::IdempotencyStrategy::Custom)
6//! function receives. When the `sqlx` feature is enabled, [`Event`] derives
7//! `sqlx::FromRow` so it can be decoded directly from a database row.
8
9use crate::object::{EventId, EventType, IdempotencyToken, Payload};
10use serde::Serialize;
11use std::fmt::Debug;
12use time::OffsetDateTime;
13
14/// A single outbox row representing one domain event to be published.
15///
16/// An event starts out with [`status`](Self::status) set to
17/// [`EventStatus::Pending`] and travels through the worker loop:
18/// a worker flips the row to [`EventStatus::Processing`] with a lock until
19/// [`locked_until`](Self::locked_until), publishes via the transport, and
20/// finally marks it [`EventStatus::Sent`]. If a worker crashes, the lock
21/// expires and the row becomes eligible again.
22///
23/// Generic over the user's payload type `PT`; see [`Payload`].
24#[cfg_attr(feature = "sqlx", derive(sqlx::FromRow))]
25#[derive(Debug, Clone)]
26pub struct Event<PT> {
27    /// Randomly generated primary key (UUID v4). Used to identify the row
28    /// across status transitions.
29    pub id: EventId,
30    /// Deduplication token produced according to the configured
31    /// [`IdempotencyStrategy`](crate::config::IdempotencyStrategy). May be
32    /// `None` when no token is produced.
33    pub idempotency_token: Option<IdempotencyToken>,
34    /// Domain-level event name used for routing on the transport side.
35    pub event_type: EventType,
36    /// The user payload, serialized as JSON when the `sqlx` feature is on.
37    #[cfg_attr(feature = "sqlx", sqlx(json))]
38    pub payload: Payload<PT>,
39    /// Wall-clock time the row was constructed, in UTC.
40    pub created_at: OffsetDateTime,
41    /// Expiration of the current processing lock. Fresh rows start with
42    /// [`OffsetDateTime::UNIX_EPOCH`] (i.e. "not locked"); storage adapters
43    /// update this when they claim the row for processing.
44    pub locked_until: OffsetDateTime,
45    /// Current lifecycle stage. See [`EventStatus`].
46    pub status: EventStatus,
47}
48impl<PT> Event<PT>
49where
50    PT: Debug + Clone + Serialize,
51{
52    /// Constructs a new [`Event`] ready to be inserted by the storage layer.
53    ///
54    /// The caller supplies the domain-level fields (`event_type`, `payload`,
55    /// `idempotency_token`); the remaining fields are initialised with sensible
56    /// defaults:
57    ///
58    /// - [`id`](Event::id) — a fresh random [`EventId`]
59    /// - [`created_at`](Event::created_at) — `OffsetDateTime::now_utc()`
60    /// - [`locked_until`](Event::locked_until) — `OffsetDateTime::UNIX_EPOCH`
61    ///   (unlocked)
62    /// - [`status`](Event::status) — [`EventStatus::Pending`]
63    pub fn new(
64        event_type: EventType,
65        payload: Payload<PT>,
66        idempotency_token: Option<IdempotencyToken>,
67    ) -> Self {
68        Self {
69            id: EventId::default(),
70            idempotency_token,
71            event_type,
72            payload,
73            created_at: OffsetDateTime::now_utc(),
74            locked_until: OffsetDateTime::UNIX_EPOCH,
75            status: EventStatus::Pending,
76        }
77    }
78}
79
80/// Lifecycle stage of an outbox [`Event`].
81///
82/// A row moves forward through the variants and never steps backwards on a
83/// happy path:
84///
85/// ```text
86/// Pending → Processing → Sent
87/// ```
88///
89/// When the `sqlx` feature is enabled, this enum maps to a Postgres type
90/// named `status` with `PascalCase` variant names.
91#[cfg_attr(feature = "sqlx", derive(sqlx::Type))]
92#[cfg_attr(
93    feature = "sqlx",
94    sqlx(type_name = "status", rename_all = "PascalCase")
95)]
96#[derive(Debug, Clone, PartialEq)]
97pub enum EventStatus {
98    /// Newly written row awaiting a worker. Includes both freshly inserted
99    /// events and rows whose processing lock expired (making them eligible
100    /// for retry).
101    Pending,
102    /// A worker has claimed the row and is currently attempting to publish it.
103    /// The lock is held until [`Event::locked_until`].
104    Processing,
105    /// The event has been successfully published to the transport. Rows in
106    /// this state are eventually removed by the garbage collector.
107    Sent,
108}
109
110#[cfg(test)]
111#[allow(clippy::unwrap_used)]
112mod tests {
113    use super::*;
114    use rstest::rstest;
115    use serde::{Deserialize, Serialize};
116
117    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
118    struct TestPayload {
119        value: String,
120    }
121
122    fn payload(v: &str) -> Payload<TestPayload> {
123        Payload::new(TestPayload { value: v.into() })
124    }
125
126    #[rstest]
127    fn event_new_sets_status_to_pending() {
128        let e = Event::new(EventType::new("t"), payload("p"), None);
129        assert_eq!(e.status, EventStatus::Pending);
130    }
131
132    #[rstest]
133    fn event_new_sets_locked_until_to_unix_epoch() {
134        let e = Event::new(EventType::new("t"), payload("p"), None);
135        assert_eq!(e.locked_until, OffsetDateTime::UNIX_EPOCH);
136    }
137
138    #[rstest]
139    fn event_new_sets_created_at_within_wall_clock_window() {
140        let before = OffsetDateTime::now_utc();
141        let e = Event::new(EventType::new("t"), payload("p"), None);
142        let after = OffsetDateTime::now_utc();
143        assert!(
144            e.created_at >= before && e.created_at <= after,
145            "created_at {} not in [{before}, {after}]",
146            e.created_at
147        );
148    }
149
150    #[rstest]
151    fn event_new_assigns_unique_ids_across_calls() {
152        let a = Event::new(EventType::new("t"), payload("p"), None);
153        let b = Event::new(EventType::new("t"), payload("p"), None);
154        assert_ne!(a.id, b.id);
155    }
156
157    #[rstest]
158    fn event_new_preserves_event_type_and_payload() {
159        let e = Event::new(EventType::new("order.created"), payload("hello"), None);
160        assert_eq!(e.event_type.as_str(), "order.created");
161        assert_eq!(e.payload.as_value().value, "hello");
162    }
163
164    #[rstest]
165    fn event_new_preserves_idempotency_token_some() {
166        let tok = IdempotencyToken::new("abc".into());
167        let e = Event::new(EventType::new("t"), payload("p"), Some(tok));
168        assert_eq!(
169            e.idempotency_token.as_ref().map(|t| t.as_str().to_owned()),
170            Some("abc".to_string())
171        );
172    }
173
174    #[rstest]
175    fn event_new_preserves_idempotency_token_none() {
176        let e = Event::new(EventType::new("t"), payload("p"), None);
177        assert!(e.idempotency_token.is_none());
178    }
179
180    #[rstest]
181    #[case(EventStatus::Pending, EventStatus::Pending, true)]
182    #[case(EventStatus::Processing, EventStatus::Processing, true)]
183    #[case(EventStatus::Sent, EventStatus::Sent, true)]
184    #[case(EventStatus::Pending, EventStatus::Processing, false)]
185    #[case(EventStatus::Processing, EventStatus::Sent, false)]
186    #[case(EventStatus::Pending, EventStatus::Sent, false)]
187    fn event_status_partial_eq(
188        #[case] a: EventStatus,
189        #[case] b: EventStatus,
190        #[case] expected: bool,
191    ) {
192        assert_eq!(a == b, expected);
193    }
194}