Skip to main content

appletheia_application/
outbox.rs

1pub mod command;
2pub mod event;
3
4mod default_outbox_relay;
5mod ordering_key;
6mod outbox_attempt_count;
7mod outbox_attempt_count_error;
8mod outbox_batch_size;
9mod outbox_dead_lettered_at;
10mod outbox_error;
11mod outbox_fetcher;
12mod outbox_fetcher_error;
13mod outbox_lease_duration;
14mod outbox_lease_expires_at;
15mod outbox_lifecycle;
16mod outbox_max_attempts;
17mod outbox_next_attempt_at;
18mod outbox_poll_backoff_multiplier;
19mod outbox_poll_backoff_multiplier_error;
20mod outbox_poll_interval;
21mod outbox_poll_jitter_ratio;
22mod outbox_poll_jitter_ratio_error;
23mod outbox_polling_options;
24mod outbox_polling_options_error;
25mod outbox_published_at;
26mod outbox_relay;
27mod outbox_relay_config;
28mod outbox_relay_error;
29mod outbox_relay_instance;
30mod outbox_relay_instance_error;
31mod outbox_relay_instance_id;
32mod outbox_relay_process_id;
33mod outbox_relay_run_report;
34mod outbox_retry_delay;
35mod outbox_retry_options;
36mod outbox_state;
37mod outbox_writer;
38mod outbox_writer_error;
39mod processed_outbox_count;
40
41pub use default_outbox_relay::DefaultOutboxRelay;
42pub use ordering_key::{OrderingKey, OrderingKeyError};
43pub use outbox_attempt_count::OutboxAttemptCount;
44pub use outbox_attempt_count_error::OutboxAttemptCountError;
45pub use outbox_batch_size::OutboxBatchSize;
46pub use outbox_dead_lettered_at::OutboxDeadLetteredAt;
47pub use outbox_error::OutboxError;
48pub use outbox_fetcher::OutboxFetcher;
49pub use outbox_fetcher_error::OutboxFetcherError;
50pub use outbox_lease_duration::OutboxLeaseDuration;
51pub use outbox_lease_expires_at::OutboxLeaseExpiresAt;
52pub use outbox_lifecycle::OutboxLifecycle;
53pub use outbox_max_attempts::OutboxMaxAttempts;
54pub use outbox_next_attempt_at::OutboxNextAttemptAt;
55pub use outbox_poll_backoff_multiplier::OutboxPollBackoffMultiplier;
56pub use outbox_poll_backoff_multiplier_error::OutboxPollBackoffMultiplierError;
57pub use outbox_poll_interval::OutboxPollInterval;
58pub use outbox_poll_jitter_ratio::OutboxPollJitterRatio;
59pub use outbox_poll_jitter_ratio_error::OutboxPollJitterRatioError;
60pub use outbox_polling_options::OutboxPollingOptions;
61pub use outbox_polling_options_error::OutboxPollingOptionsError;
62pub use outbox_published_at::OutboxPublishedAt;
63pub use outbox_relay::OutboxRelay;
64pub use outbox_relay_config::OutboxRelayConfig;
65pub use outbox_relay_error::OutboxRelayError;
66pub use outbox_relay_instance::OutboxRelayInstance;
67pub use outbox_relay_instance_error::OutboxRelayInstanceError;
68pub use outbox_relay_instance_id::OutboxRelayInstanceId;
69pub use outbox_relay_process_id::OutboxRelayProcessId;
70pub use outbox_relay_run_report::OutboxRelayRunReport;
71pub use outbox_retry_delay::OutboxRetryDelay;
72pub use outbox_retry_options::OutboxRetryOptions;
73pub use outbox_state::OutboxState;
74pub use outbox_writer::OutboxWriter;
75pub use outbox_writer_error::OutboxWriterError;
76pub use processed_outbox_count::ProcessedOutboxCount;
77
78use crate::messaging::PublishDispatchError;
79
80pub trait Outbox {
81    type Id: Copy + Eq + 'static;
82    type Message;
83
84    fn id(&self) -> Self::Id;
85
86    fn ordering_key(&self) -> OrderingKey;
87
88    fn message(&self) -> &Self::Message;
89
90    fn state(&self) -> &OutboxState;
91
92    fn state_mut(&mut self) -> &mut OutboxState;
93
94    fn last_error(&self) -> &Option<PublishDispatchError>;
95
96    fn last_error_mut(&mut self) -> &mut Option<PublishDispatchError>;
97
98    fn lifecycle(&self) -> &OutboxLifecycle;
99
100    fn lifecycle_mut(&mut self) -> &mut OutboxLifecycle;
101
102    fn ack(&mut self) -> Result<(), OutboxError> {
103        if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
104            return Err(OutboxError::AckOnDeadLettered(self.lifecycle().clone()));
105        }
106
107        let published_at = OutboxPublishedAt::now();
108        let attempt_count = self.state().attempt_count();
109
110        *self.state_mut() = OutboxState::Published {
111            published_at,
112            attempt_count,
113        };
114        *self.last_error_mut() = None;
115        *self.lifecycle_mut() = OutboxLifecycle::Active;
116
117        Ok(())
118    }
119
120    fn nack(
121        &mut self,
122        cause: &PublishDispatchError,
123        retry_options: &OutboxRetryOptions,
124    ) -> Result<(), OutboxError> {
125        if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
126            return Err(OutboxError::NackOnDeadLettered(self.lifecycle().clone()));
127        }
128
129        *self.last_error_mut() = Some(cause.clone());
130
131        let current_attempt_count = self.state().attempt_count();
132        let next_attempt_count = current_attempt_count
133            .try_increment()
134            .map_err(OutboxError::AttemptCount)?;
135
136        let maximum_attempts = retry_options.max_attempts.value().get() as i64;
137        let has_exceeded_maximum_attempts = next_attempt_count.value() > maximum_attempts;
138
139        if has_exceeded_maximum_attempts {
140            let dead_lettered_at = OutboxDeadLetteredAt::now();
141            *self.lifecycle_mut() = OutboxLifecycle::DeadLettered { dead_lettered_at };
142        } else {
143            match cause {
144                PublishDispatchError::Permanent { .. } => {
145                    let dead_lettered_at = OutboxDeadLetteredAt::now();
146                    *self.lifecycle_mut() = OutboxLifecycle::DeadLettered { dead_lettered_at };
147                }
148                PublishDispatchError::Transient { .. } => {
149                    let next_attempt_at = OutboxNextAttemptAt::now().next(retry_options.backoff);
150
151                    *self.state_mut() = OutboxState::Pending {
152                        attempt_count: next_attempt_count,
153                        next_attempt_after: next_attempt_at,
154                    };
155                    *self.lifecycle_mut() = OutboxLifecycle::Active;
156                }
157            }
158        }
159
160        Ok(())
161    }
162
163    fn extend_lease(
164        &mut self,
165        owner: &OutboxRelayInstance,
166        lease_for: OutboxLeaseDuration,
167    ) -> Result<(), OutboxError> {
168        if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
169            return Err(OutboxError::ExtendLeaseOnDeadLettered(
170                self.lifecycle().clone(),
171            ));
172        }
173
174        let current_state = self.state().clone();
175        let lease_expires_at = OutboxLeaseExpiresAt::from_now(lease_for);
176
177        match current_state {
178            OutboxState::Leased {
179                attempt_count,
180                next_attempt_after,
181                ..
182            } => {
183                *self.state_mut() = OutboxState::Leased {
184                    attempt_count,
185                    next_attempt_after,
186                    lease_owner: owner.clone(),
187                    lease_until: lease_expires_at,
188                };
189                Ok(())
190            }
191            _ => Err(OutboxError::ExtendLeaseOnNonLeased(current_state)),
192        }
193    }
194
195    fn acquire_lease(
196        &mut self,
197        owner: &OutboxRelayInstance,
198        lease_for: OutboxLeaseDuration,
199    ) -> Result<(), OutboxError> {
200        if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
201            return Err(OutboxError::AcquireLeaseOnDeadLettered(
202                self.lifecycle().clone(),
203            ));
204        }
205
206        let current_state = self.state().clone();
207        let lease_expires_at = OutboxLeaseExpiresAt::from_now(lease_for);
208
209        match current_state {
210            OutboxState::Pending {
211                attempt_count,
212                next_attempt_after,
213            } => {
214                *self.state_mut() = OutboxState::Leased {
215                    attempt_count,
216                    next_attempt_after,
217                    lease_owner: owner.clone(),
218                    lease_until: lease_expires_at,
219                };
220                Ok(())
221            }
222            _ => Err(OutboxError::AcquireLeaseOnNonPending(current_state)),
223        }
224    }
225}