1use crate::config::OutboxConfig;
10use crate::error::OutboxError;
11use crate::model::Event;
12use crate::model::EventStatus::Sent;
13use crate::object::EventId;
14use crate::publisher::Transport;
15use crate::storage::OutboxStorage;
16use serde::Serialize;
17use std::fmt::Debug;
18use std::sync::Arc;
19use tracing::error;
20
21pub struct OutboxProcessor<S, T, P>
26where
27 P: Debug + Clone + Serialize,
28{
29 storage: Arc<S>,
30 publisher: Arc<T>,
31 config: Arc<OutboxConfig<P>>,
32}
33
34impl<S, T, P> OutboxProcessor<S, T, P>
35where
36 S: OutboxStorage<P> + 'static,
37 T: Transport<P> + 'static,
38 P: Debug + Clone + Serialize + Send + Sync,
39{
40 pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig<P>>) -> Self {
43 Self {
44 storage,
45 publisher,
46 config,
47 }
48 }
49
50 pub async fn process_pending_events(
75 &self,
76 #[cfg(feature = "dlq")] dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
77 ) -> Result<usize, OutboxError> {
78 let events: Vec<Event<P>> = self
79 .storage
80 .fetch_next_to_process(self.config.batch_size)
81 .await?;
82
83 if events.is_empty() {
84 return Ok(0);
85 }
86 let count = events.len();
87
88 #[cfg(feature = "dlq")]
89 self.event_publish(events, dlq_heap).await?;
90 #[cfg(not(feature = "dlq"))]
91 self.event_publish(events).await?;
92
93 Ok(count)
94 }
95
96 async fn event_publish(
97 &self,
98 events: Vec<Event<P>>,
99 #[cfg(feature = "dlq")] dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
100 ) -> Result<(), OutboxError> {
101 let mut success_ids = Vec::<EventId>::new();
102 for event in events {
103 let id = event.id;
104
105 #[cfg(feature = "metrics")]
106 let start = std::time::Instant::now();
107
108 let event_type = event.event_type.to_string();
109
110 match self.publisher.publish(event).await {
111 Ok(()) => {
112 success_ids.push(id);
113 #[cfg(feature = "dlq")]
114 dlq_heap.record_success(id).await?;
115 #[cfg(feature = "metrics")]
116 {
117 let delta = start.elapsed().as_secs_f64();
118
119 metrics::counter!("outbox.events_total",
120 "status" => "success",
121 "event_type" => event_type.clone()
122 )
123 .increment(1);
124
125 metrics::histogram!(
126 "outbox.publish_duration_seconds",
127 "event_type" => event_type.clone()
128 )
129 .record(delta);
130 }
131 }
132 Err(e) => {
133 error!("Failed to publish event {:?}: {:?}", id, e);
134 #[cfg(feature = "dlq")]
135 dlq_heap.record_failure(id).await?;
136
137 #[cfg(feature = "metrics")]
138 {
139 let delta = start.elapsed().as_secs_f64();
140
141 metrics::counter!("outbox.events_total",
142 "status" => "error",
143 "event_type" => event_type.clone()
144 )
145 .increment(1);
146
147 metrics::histogram!(
148 "outbox.publish_duration_seconds",
149 "status" => "error",
150 "event_type" => event_type
151 )
152 .record(delta);
153 }
154 }
155 }
156 }
157 if !success_ids.is_empty() {
158 self.storage.update_status(&success_ids, Sent).await?;
159 }
160 Ok(())
161 }
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167 use super::*;
168 use crate::config::{IdempotencyStrategy, OutboxConfig};
169 #[cfg(feature = "dlq")]
170 use crate::dlq::storage::MockDlqHeap;
171 use crate::model::EventStatus;
172 use crate::object::EventType;
173 use crate::prelude::Payload;
174 use crate::publisher::MockTransport;
175 use crate::storage::MockOutboxStorage;
176 use mockall::Sequence;
177 use rstest::rstest;
178 use serde::{Deserialize, Serialize};
179 use std::collections::HashSet;
180 use std::sync::Arc;
181
182 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183 enum TestEvent {
184 A(String),
185 }
186
187 fn config() -> Arc<OutboxConfig<TestEvent>> {
188 Arc::new(OutboxConfig {
189 batch_size: 100,
190 retention_days: 7,
191 gc_interval_secs: 3600,
192 poll_interval_secs: 5,
193 lock_timeout_mins: 5,
194 idempotency_strategy: IdempotencyStrategy::None,
195 dlq_threshold: 10,
196 dlq_interval_secs: 1,
197 })
198 }
199
200 fn make_event(n: u32) -> Event<TestEvent> {
201 Event::new(
202 EventType::new(&format!("t{n}")),
203 Payload::new(TestEvent::A(format!("v{n}"))),
204 None,
205 )
206 }
207
208 #[rstest]
209 #[tokio::test]
210 async fn empty_batch_returns_zero_and_skips_status_update() {
211 let mut storage = MockOutboxStorage::<TestEvent>::new();
212 let mut transport = MockTransport::<TestEvent>::new();
213
214 storage
215 .expect_fetch_next_to_process()
216 .withf(|limit| *limit == 100)
217 .times(1)
218 .returning(|_| Ok(vec![]));
219
220 storage.expect_update_status().times(0);
221 transport.expect_publish().times(0);
222
223 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
224
225 #[cfg(not(feature = "dlq"))]
226 let result = processor.process_pending_events().await;
227
228 #[cfg(feature = "dlq")]
229 let result = {
230 let mut dlq = MockDlqHeap::new();
231 dlq.expect_record_success().times(0);
232 dlq.expect_record_failure().times(0);
233 dlq.expect_drain_exceeded().times(0);
234 processor.process_pending_events(Arc::new(dlq)).await
235 };
236
237 assert!(matches!(result, Ok(0)));
238 }
239
240 #[rstest]
241 #[tokio::test]
242 async fn all_publishes_succeed_updates_status_with_all_ids() {
243 let mut storage = MockOutboxStorage::<TestEvent>::new();
244 let mut transport = MockTransport::<TestEvent>::new();
245
246 let events = vec![make_event(1), make_event(2), make_event(3)];
247 let expected_ids: HashSet<EventId> = events.iter().map(|e| e.id).collect();
248
249 storage
250 .expect_fetch_next_to_process()
251 .times(1)
252 .returning(move |_| Ok(events.clone()));
253
254 storage
255 .expect_update_status()
256 .withf(move |ids, status| {
257 let got: HashSet<EventId> = ids.iter().copied().collect();
258 got == expected_ids && *status == EventStatus::Sent
259 })
260 .times(1)
261 .returning(|_, _| Ok(()));
262
263 transport.expect_publish().times(3).returning(|_| Ok(()));
264
265 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
266
267 #[cfg(not(feature = "dlq"))]
268 let result = processor.process_pending_events().await;
269
270 #[cfg(feature = "dlq")]
271 let result = {
272 let mut dlq = MockDlqHeap::new();
273 dlq.expect_record_success().times(3).returning(|_| Ok(()));
274 dlq.expect_record_failure().times(0);
275 processor.process_pending_events(Arc::new(dlq)).await
276 };
277
278 assert!(matches!(result, Ok(3)));
279 }
280
281 #[rstest]
282 #[tokio::test]
283 async fn partial_publish_failure_updates_only_successful() {
284 let mut storage = MockOutboxStorage::<TestEvent>::new();
285 let mut transport = MockTransport::<TestEvent>::new();
286
287 let e1 = make_event(1);
288 let e2 = make_event(2);
289 let e3 = make_event(3);
290 let id1 = e1.id;
291 let id2 = e2.id;
292 let id3 = e3.id;
293
294 storage
295 .expect_fetch_next_to_process()
296 .times(1)
297 .returning(move |_| Ok(vec![e1.clone(), e2.clone(), e3.clone()]));
298
299 storage
300 .expect_update_status()
301 .withf(move |ids, status| {
302 let got: HashSet<EventId> = ids.iter().copied().collect();
303 got.len() == 2
304 && got.contains(&id1)
305 && got.contains(&id3)
306 && !got.contains(&id2)
307 && *status == EventStatus::Sent
308 })
309 .times(1)
310 .returning(|_, _| Ok(()));
311
312 let mut seq = Sequence::new();
313 transport
314 .expect_publish()
315 .times(1)
316 .in_sequence(&mut seq)
317 .returning(|_| Ok(()));
318 transport
319 .expect_publish()
320 .times(1)
321 .in_sequence(&mut seq)
322 .returning(|_| Err(OutboxError::BrokerError("boom".into())));
323 transport
324 .expect_publish()
325 .times(1)
326 .in_sequence(&mut seq)
327 .returning(|_| Ok(()));
328
329 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
330
331 #[cfg(not(feature = "dlq"))]
332 let result = processor.process_pending_events().await;
333
334 #[cfg(feature = "dlq")]
335 let result = {
336 let mut dlq = MockDlqHeap::new();
337 dlq.expect_record_success()
338 .withf(move |id| *id == id1 || *id == id3)
339 .times(2)
340 .returning(|_| Ok(()));
341 dlq.expect_record_failure()
342 .withf(move |id| *id == id2)
343 .times(1)
344 .returning(|_| Ok(()));
345 processor.process_pending_events(Arc::new(dlq)).await
346 };
347
348 assert!(matches!(result, Ok(3)));
349 }
350
351 #[rstest]
352 #[tokio::test]
353 async fn all_publishes_fail_skips_status_update() {
354 let mut storage = MockOutboxStorage::<TestEvent>::new();
355 let mut transport = MockTransport::<TestEvent>::new();
356
357 let events = vec![make_event(1), make_event(2)];
358
359 storage
360 .expect_fetch_next_to_process()
361 .times(1)
362 .returning(move |_| Ok(events.clone()));
363 storage.expect_update_status().times(0);
364
365 transport
366 .expect_publish()
367 .times(2)
368 .returning(|_| Err(OutboxError::BrokerError("x".into())));
369
370 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
371
372 #[cfg(not(feature = "dlq"))]
373 let result = processor.process_pending_events().await;
374
375 #[cfg(feature = "dlq")]
376 let result = {
377 let mut dlq = MockDlqHeap::new();
378 dlq.expect_record_success().times(0);
379 dlq.expect_record_failure().times(2).returning(|_| Ok(()));
380 processor.process_pending_events(Arc::new(dlq)).await
381 };
382
383 assert!(matches!(result, Ok(2)));
384 }
385
386 #[rstest]
387 #[tokio::test]
388 async fn fetch_error_propagates_without_publishing() {
389 let mut storage = MockOutboxStorage::<TestEvent>::new();
390 let mut transport = MockTransport::<TestEvent>::new();
391
392 storage
393 .expect_fetch_next_to_process()
394 .times(1)
395 .returning(|_| Err(OutboxError::DatabaseError("boom".into())));
396 storage.expect_update_status().times(0);
397 transport.expect_publish().times(0);
398
399 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
400
401 #[cfg(not(feature = "dlq"))]
402 let result = processor.process_pending_events().await;
403
404 #[cfg(feature = "dlq")]
405 let result = {
406 let mut dlq = MockDlqHeap::new();
407 dlq.expect_record_success().times(0);
408 dlq.expect_record_failure().times(0);
409 processor.process_pending_events(Arc::new(dlq)).await
410 };
411
412 assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
413 }
414
415 #[rstest]
416 #[tokio::test]
417 async fn update_status_error_propagates_after_publish() {
418 let mut storage = MockOutboxStorage::<TestEvent>::new();
419 let mut transport = MockTransport::<TestEvent>::new();
420
421 storage
422 .expect_fetch_next_to_process()
423 .times(1)
424 .returning(move |_| Ok(vec![make_event(1)]));
425 storage
426 .expect_update_status()
427 .times(1)
428 .returning(|_, _| Err(OutboxError::DatabaseError("boom".into())));
429
430 transport.expect_publish().times(1).returning(|_| Ok(()));
431
432 let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
433
434 #[cfg(not(feature = "dlq"))]
435 let result = processor.process_pending_events().await;
436
437 #[cfg(feature = "dlq")]
438 let result = {
439 let mut dlq = MockDlqHeap::new();
440 dlq.expect_record_success().times(1).returning(|_| Ok(()));
441 processor.process_pending_events(Arc::new(dlq)).await
442 };
443
444 assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
445 }
446}