appletheia_application/
outbox.rs1pub mod command;
2pub mod event;
3
4mod default_outbox_relay;
5mod ordering_key;
6mod outbox_attempt_count;
7mod outbox_attempt_count_error;
8mod outbox_batch_size;
9mod outbox_dead_lettered_at;
10mod outbox_error;
11mod outbox_fetcher;
12mod outbox_fetcher_error;
13mod outbox_lease_duration;
14mod outbox_lease_expires_at;
15mod outbox_lifecycle;
16mod outbox_max_attempts;
17mod outbox_next_attempt_at;
18mod outbox_poll_backoff_multiplier;
19mod outbox_poll_backoff_multiplier_error;
20mod outbox_poll_interval;
21mod outbox_poll_jitter_ratio;
22mod outbox_poll_jitter_ratio_error;
23mod outbox_polling_options;
24mod outbox_polling_options_error;
25mod outbox_published_at;
26mod outbox_relay;
27mod outbox_relay_config;
28mod outbox_relay_error;
29mod outbox_relay_instance;
30mod outbox_relay_instance_error;
31mod outbox_relay_instance_id;
32mod outbox_relay_process_id;
33mod outbox_relay_run_report;
34mod outbox_retry_delay;
35mod outbox_retry_options;
36mod outbox_state;
37mod outbox_writer;
38mod outbox_writer_error;
39mod processed_outbox_count;
40
41pub use default_outbox_relay::DefaultOutboxRelay;
42pub use ordering_key::{OrderingKey, OrderingKeyError};
43pub use outbox_attempt_count::OutboxAttemptCount;
44pub use outbox_attempt_count_error::OutboxAttemptCountError;
45pub use outbox_batch_size::OutboxBatchSize;
46pub use outbox_dead_lettered_at::OutboxDeadLetteredAt;
47pub use outbox_error::OutboxError;
48pub use outbox_fetcher::OutboxFetcher;
49pub use outbox_fetcher_error::OutboxFetcherError;
50pub use outbox_lease_duration::OutboxLeaseDuration;
51pub use outbox_lease_expires_at::OutboxLeaseExpiresAt;
52pub use outbox_lifecycle::OutboxLifecycle;
53pub use outbox_max_attempts::OutboxMaxAttempts;
54pub use outbox_next_attempt_at::OutboxNextAttemptAt;
55pub use outbox_poll_backoff_multiplier::OutboxPollBackoffMultiplier;
56pub use outbox_poll_backoff_multiplier_error::OutboxPollBackoffMultiplierError;
57pub use outbox_poll_interval::OutboxPollInterval;
58pub use outbox_poll_jitter_ratio::OutboxPollJitterRatio;
59pub use outbox_poll_jitter_ratio_error::OutboxPollJitterRatioError;
60pub use outbox_polling_options::OutboxPollingOptions;
61pub use outbox_polling_options_error::OutboxPollingOptionsError;
62pub use outbox_published_at::OutboxPublishedAt;
63pub use outbox_relay::OutboxRelay;
64pub use outbox_relay_config::OutboxRelayConfig;
65pub use outbox_relay_error::OutboxRelayError;
66pub use outbox_relay_instance::OutboxRelayInstance;
67pub use outbox_relay_instance_error::OutboxRelayInstanceError;
68pub use outbox_relay_instance_id::OutboxRelayInstanceId;
69pub use outbox_relay_process_id::OutboxRelayProcessId;
70pub use outbox_relay_run_report::OutboxRelayRunReport;
71pub use outbox_retry_delay::OutboxRetryDelay;
72pub use outbox_retry_options::OutboxRetryOptions;
73pub use outbox_state::OutboxState;
74pub use outbox_writer::OutboxWriter;
75pub use outbox_writer_error::OutboxWriterError;
76pub use processed_outbox_count::ProcessedOutboxCount;
77
78use crate::messaging::PublishDispatchError;
79
80pub trait Outbox {
81 type Id: Copy + Eq + 'static;
82 type Message;
83
84 fn id(&self) -> Self::Id;
85
86 fn ordering_key(&self) -> OrderingKey;
87
88 fn message(&self) -> &Self::Message;
89
90 fn state(&self) -> &OutboxState;
91
92 fn state_mut(&mut self) -> &mut OutboxState;
93
94 fn last_error(&self) -> &Option<PublishDispatchError>;
95
96 fn last_error_mut(&mut self) -> &mut Option<PublishDispatchError>;
97
98 fn lifecycle(&self) -> &OutboxLifecycle;
99
100 fn lifecycle_mut(&mut self) -> &mut OutboxLifecycle;
101
102 fn ack(&mut self) -> Result<(), OutboxError> {
103 if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
104 return Err(OutboxError::AckOnDeadLettered(self.lifecycle().clone()));
105 }
106
107 let published_at = OutboxPublishedAt::now();
108 let attempt_count = self.state().attempt_count();
109
110 *self.state_mut() = OutboxState::Published {
111 published_at,
112 attempt_count,
113 };
114 *self.last_error_mut() = None;
115 *self.lifecycle_mut() = OutboxLifecycle::Active;
116
117 Ok(())
118 }
119
120 fn nack(
121 &mut self,
122 cause: &PublishDispatchError,
123 retry_options: &OutboxRetryOptions,
124 ) -> Result<(), OutboxError> {
125 if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
126 return Err(OutboxError::NackOnDeadLettered(self.lifecycle().clone()));
127 }
128
129 *self.last_error_mut() = Some(cause.clone());
130
131 let current_attempt_count = self.state().attempt_count();
132 let next_attempt_count = current_attempt_count
133 .try_increment()
134 .map_err(OutboxError::AttemptCount)?;
135
136 let maximum_attempts = retry_options.max_attempts.value().get() as i64;
137 let has_exceeded_maximum_attempts = next_attempt_count.value() > maximum_attempts;
138
139 if has_exceeded_maximum_attempts {
140 let dead_lettered_at = OutboxDeadLetteredAt::now();
141 *self.lifecycle_mut() = OutboxLifecycle::DeadLettered { dead_lettered_at };
142 } else {
143 match cause {
144 PublishDispatchError::Permanent { .. } => {
145 let dead_lettered_at = OutboxDeadLetteredAt::now();
146 *self.lifecycle_mut() = OutboxLifecycle::DeadLettered { dead_lettered_at };
147 }
148 PublishDispatchError::Transient { .. } => {
149 let next_attempt_at = OutboxNextAttemptAt::now().next(retry_options.backoff);
150
151 *self.state_mut() = OutboxState::Pending {
152 attempt_count: next_attempt_count,
153 next_attempt_after: next_attempt_at,
154 };
155 *self.lifecycle_mut() = OutboxLifecycle::Active;
156 }
157 }
158 }
159
160 Ok(())
161 }
162
163 fn extend_lease(
164 &mut self,
165 owner: &OutboxRelayInstance,
166 lease_for: OutboxLeaseDuration,
167 ) -> Result<(), OutboxError> {
168 if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
169 return Err(OutboxError::ExtendLeaseOnDeadLettered(
170 self.lifecycle().clone(),
171 ));
172 }
173
174 let current_state = self.state().clone();
175 let lease_expires_at = OutboxLeaseExpiresAt::from_now(lease_for);
176
177 match current_state {
178 OutboxState::Leased {
179 attempt_count,
180 next_attempt_after,
181 ..
182 } => {
183 *self.state_mut() = OutboxState::Leased {
184 attempt_count,
185 next_attempt_after,
186 lease_owner: owner.clone(),
187 lease_until: lease_expires_at,
188 };
189 Ok(())
190 }
191 _ => Err(OutboxError::ExtendLeaseOnNonLeased(current_state)),
192 }
193 }
194
195 fn acquire_lease(
196 &mut self,
197 owner: &OutboxRelayInstance,
198 lease_for: OutboxLeaseDuration,
199 ) -> Result<(), OutboxError> {
200 if matches!(self.lifecycle(), OutboxLifecycle::DeadLettered { .. }) {
201 return Err(OutboxError::AcquireLeaseOnDeadLettered(
202 self.lifecycle().clone(),
203 ));
204 }
205
206 let current_state = self.state().clone();
207 let lease_expires_at = OutboxLeaseExpiresAt::from_now(lease_for);
208
209 match current_state {
210 OutboxState::Pending {
211 attempt_count,
212 next_attempt_after,
213 } => {
214 *self.state_mut() = OutboxState::Leased {
215 attempt_count,
216 next_attempt_after,
217 lease_owner: owner.clone(),
218 lease_until: lease_expires_at,
219 };
220 Ok(())
221 }
222 _ => Err(OutboxError::AcquireLeaseOnNonPending(current_state)),
223 }
224 }
225}