Skip to main content

outbox_core/
processor.rs

1//! One iteration of the worker loop: fetch a batch of pending events,
2//! publish each of them, and record the outcome.
3//!
4//! [`OutboxProcessor`] is the unit [`OutboxManager`](crate::manager::OutboxManager)
5//! drives on every wake-up. It keeps the orchestration inside the manager
6//! simple (drain until a fetch returns an empty batch) and leaves the
7//! per-event work — publishing and status bookkeeping — encapsulated here.
8
9use crate::config::OutboxConfig;
10use crate::error::OutboxError;
11use crate::model::Event;
12use crate::model::EventStatus::Sent;
13use crate::object::EventId;
14use crate::publisher::Transport;
15use crate::storage::OutboxStorage;
16use serde::Serialize;
17use std::fmt::Debug;
18use std::sync::Arc;
19use tracing::error;
20
21/// Processes one batch of pending outbox events per invocation.
22///
23/// Holds handles to storage, transport, and configuration; the manager
24/// constructs a single processor at startup and reuses it across iterations.
25pub struct OutboxProcessor<S, T, P>
26where
27    P: Debug + Clone + Serialize,
28{
29    storage: Arc<S>,
30    publisher: Arc<T>,
31    config: Arc<OutboxConfig<P>>,
32}
33
34impl<S, T, P> OutboxProcessor<S, T, P>
35where
36    S: OutboxStorage<P> + 'static,
37    T: Transport<P> + 'static,
38    P: Debug + Clone + Serialize + Send + Sync,
39{
40    /// Creates a processor wired to the supplied storage, transport, and
41    /// configuration.
42    pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig<P>>) -> Self {
43        Self {
44            storage,
45            publisher,
46            config,
47        }
48    }
49
50    /// Processes one batch of pending events.
51    ///
52    /// Fetches up to `config.batch_size` rows via
53    /// [`OutboxStorage::fetch_next_to_process`], publishes each through the
54    /// [`Transport`], and then marks every successfully published row as
55    /// [`Sent`](crate::model::EventStatus::Sent) in a single
56    /// [`update_status`](OutboxStorage::update_status) call. Rows whose
57    /// `publish` call failed are left in `Processing`; their lock will expire
58    /// and make them eligible for retry.
59    ///
60    /// Returns the number of events fetched in the batch — `0` signals to the
61    /// caller (typically the manager's drain loop) that there is nothing left
62    /// to do right now and it can go back to waiting.
63    ///
64    /// When the `dlq` feature is enabled, takes a shared
65    /// [`DlqHeap`](crate::dlq::storage::DlqHeap) and records success or
66    /// failure per event so the heap can track repeat-offender rows.
67    ///
68    /// # Errors
69    ///
70    /// Returns a [`DatabaseError`](OutboxError::DatabaseError) propagated from
71    /// `fetch_next_to_process` or `update_status`. Per-event publish failures
72    /// are *not* propagated — they are logged via `tracing::error!` and the
73    /// rest of the batch continues.
74    pub async fn process_pending_events(
75        &self,
76        #[cfg(feature = "dlq")] dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
77    ) -> Result<usize, OutboxError> {
78        let events: Vec<Event<P>> = self
79            .storage
80            .fetch_next_to_process(self.config.batch_size)
81            .await?;
82
83        if events.is_empty() {
84            return Ok(0);
85        }
86        let count = events.len();
87
88        #[cfg(feature = "dlq")]
89        self.event_publish(events, dlq_heap).await?;
90        #[cfg(not(feature = "dlq"))]
91        self.event_publish(events).await?;
92
93        Ok(count)
94    }
95
96    async fn event_publish(
97        &self,
98        events: Vec<Event<P>>,
99        #[cfg(feature = "dlq")] dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
100    ) -> Result<(), OutboxError> {
101        let mut success_ids = Vec::<EventId>::new();
102        for event in events {
103            let id = event.id;
104
105            #[cfg(feature = "metrics")]
106            let start = std::time::Instant::now();
107
108            let event_type = event.event_type.to_string();
109
110            match self.publisher.publish(event).await {
111                Ok(()) => {
112                    success_ids.push(id);
113                    #[cfg(feature = "dlq")]
114                    dlq_heap.record_success(id).await?;
115                    #[cfg(feature = "metrics")]
116                    {
117                        let delta = start.elapsed().as_secs_f64();
118
119                        metrics::counter!("outbox.events_total",
120                            "status" => "success",
121                            "event_type" => event_type.clone()
122                        )
123                        .increment(1);
124
125                        metrics::histogram!(
126                            "outbox.publish_duration_seconds",
127                            "event_type" => event_type.clone()
128                        )
129                        .record(delta);
130                    }
131                }
132                Err(e) => {
133                    error!("Failed to publish event {:?}: {:?}", id, e);
134                    #[cfg(feature = "dlq")]
135                    dlq_heap.record_failure(id).await?;
136
137                    #[cfg(feature = "metrics")]
138                    {
139                        let delta = start.elapsed().as_secs_f64();
140
141                        metrics::counter!("outbox.events_total",
142                            "status" => "error",
143                            "event_type" => event_type.clone()
144                        )
145                        .increment(1);
146
147                        metrics::histogram!(
148                            "outbox.publish_duration_seconds",
149                            "status" => "error",
150                            "event_type" => event_type
151                        )
152                        .record(delta);
153                    }
154                }
155            }
156        }
157        if !success_ids.is_empty() {
158            self.storage.update_status(&success_ids, Sent).await?;
159        }
160        Ok(())
161    }
162}
163
164#[cfg(test)]
165#[allow(clippy::unwrap_used)]
166mod tests {
167    use super::*;
168    use crate::config::{IdempotencyStrategy, OutboxConfig};
169    #[cfg(feature = "dlq")]
170    use crate::dlq::storage::MockDlqHeap;
171    use crate::model::EventStatus;
172    use crate::object::EventType;
173    use crate::prelude::Payload;
174    use crate::publisher::MockTransport;
175    use crate::storage::MockOutboxStorage;
176    use mockall::Sequence;
177    use rstest::rstest;
178    use serde::{Deserialize, Serialize};
179    use std::collections::HashSet;
180    use std::sync::Arc;
181
182    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183    enum TestEvent {
184        A(String),
185    }
186
187    fn config() -> Arc<OutboxConfig<TestEvent>> {
188        Arc::new(OutboxConfig {
189            batch_size: 100,
190            retention_days: 7,
191            gc_interval_secs: 3600,
192            poll_interval_secs: 5,
193            lock_timeout_mins: 5,
194            idempotency_strategy: IdempotencyStrategy::None,
195            dlq_threshold: 10,
196            dlq_interval_secs: 1,
197        })
198    }
199
200    fn make_event(n: u32) -> Event<TestEvent> {
201        Event::new(
202            EventType::new(&format!("t{n}")),
203            Payload::new(TestEvent::A(format!("v{n}"))),
204            None,
205        )
206    }
207
208    #[rstest]
209    #[tokio::test]
210    async fn empty_batch_returns_zero_and_skips_status_update() {
211        let mut storage = MockOutboxStorage::<TestEvent>::new();
212        let mut transport = MockTransport::<TestEvent>::new();
213
214        storage
215            .expect_fetch_next_to_process()
216            .withf(|limit| *limit == 100)
217            .times(1)
218            .returning(|_| Ok(vec![]));
219
220        storage.expect_update_status().times(0);
221        transport.expect_publish().times(0);
222
223        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
224
225        #[cfg(not(feature = "dlq"))]
226        let result = processor.process_pending_events().await;
227
228        #[cfg(feature = "dlq")]
229        let result = {
230            let mut dlq = MockDlqHeap::new();
231            dlq.expect_record_success().times(0);
232            dlq.expect_record_failure().times(0);
233            dlq.expect_drain_exceeded().times(0);
234            processor.process_pending_events(Arc::new(dlq)).await
235        };
236
237        assert!(matches!(result, Ok(0)));
238    }
239
240    #[rstest]
241    #[tokio::test]
242    async fn all_publishes_succeed_updates_status_with_all_ids() {
243        let mut storage = MockOutboxStorage::<TestEvent>::new();
244        let mut transport = MockTransport::<TestEvent>::new();
245
246        let events = vec![make_event(1), make_event(2), make_event(3)];
247        let expected_ids: HashSet<EventId> = events.iter().map(|e| e.id).collect();
248
249        storage
250            .expect_fetch_next_to_process()
251            .times(1)
252            .returning(move |_| Ok(events.clone()));
253
254        storage
255            .expect_update_status()
256            .withf(move |ids, status| {
257                let got: HashSet<EventId> = ids.iter().copied().collect();
258                got == expected_ids && *status == EventStatus::Sent
259            })
260            .times(1)
261            .returning(|_, _| Ok(()));
262
263        transport.expect_publish().times(3).returning(|_| Ok(()));
264
265        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
266
267        #[cfg(not(feature = "dlq"))]
268        let result = processor.process_pending_events().await;
269
270        #[cfg(feature = "dlq")]
271        let result = {
272            let mut dlq = MockDlqHeap::new();
273            dlq.expect_record_success().times(3).returning(|_| Ok(()));
274            dlq.expect_record_failure().times(0);
275            processor.process_pending_events(Arc::new(dlq)).await
276        };
277
278        assert!(matches!(result, Ok(3)));
279    }
280
281    #[rstest]
282    #[tokio::test]
283    async fn partial_publish_failure_updates_only_successful() {
284        let mut storage = MockOutboxStorage::<TestEvent>::new();
285        let mut transport = MockTransport::<TestEvent>::new();
286
287        let e1 = make_event(1);
288        let e2 = make_event(2);
289        let e3 = make_event(3);
290        let id1 = e1.id;
291        let id2 = e2.id;
292        let id3 = e3.id;
293
294        storage
295            .expect_fetch_next_to_process()
296            .times(1)
297            .returning(move |_| Ok(vec![e1.clone(), e2.clone(), e3.clone()]));
298
299        storage
300            .expect_update_status()
301            .withf(move |ids, status| {
302                let got: HashSet<EventId> = ids.iter().copied().collect();
303                got.len() == 2
304                    && got.contains(&id1)
305                    && got.contains(&id3)
306                    && !got.contains(&id2)
307                    && *status == EventStatus::Sent
308            })
309            .times(1)
310            .returning(|_, _| Ok(()));
311
312        let mut seq = Sequence::new();
313        transport
314            .expect_publish()
315            .times(1)
316            .in_sequence(&mut seq)
317            .returning(|_| Ok(()));
318        transport
319            .expect_publish()
320            .times(1)
321            .in_sequence(&mut seq)
322            .returning(|_| Err(OutboxError::BrokerError("boom".into())));
323        transport
324            .expect_publish()
325            .times(1)
326            .in_sequence(&mut seq)
327            .returning(|_| Ok(()));
328
329        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
330
331        #[cfg(not(feature = "dlq"))]
332        let result = processor.process_pending_events().await;
333
334        #[cfg(feature = "dlq")]
335        let result = {
336            let mut dlq = MockDlqHeap::new();
337            dlq.expect_record_success()
338                .withf(move |id| *id == id1 || *id == id3)
339                .times(2)
340                .returning(|_| Ok(()));
341            dlq.expect_record_failure()
342                .withf(move |id| *id == id2)
343                .times(1)
344                .returning(|_| Ok(()));
345            processor.process_pending_events(Arc::new(dlq)).await
346        };
347
348        assert!(matches!(result, Ok(3)));
349    }
350
351    #[rstest]
352    #[tokio::test]
353    async fn all_publishes_fail_skips_status_update() {
354        let mut storage = MockOutboxStorage::<TestEvent>::new();
355        let mut transport = MockTransport::<TestEvent>::new();
356
357        let events = vec![make_event(1), make_event(2)];
358
359        storage
360            .expect_fetch_next_to_process()
361            .times(1)
362            .returning(move |_| Ok(events.clone()));
363        storage.expect_update_status().times(0);
364
365        transport
366            .expect_publish()
367            .times(2)
368            .returning(|_| Err(OutboxError::BrokerError("x".into())));
369
370        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
371
372        #[cfg(not(feature = "dlq"))]
373        let result = processor.process_pending_events().await;
374
375        #[cfg(feature = "dlq")]
376        let result = {
377            let mut dlq = MockDlqHeap::new();
378            dlq.expect_record_success().times(0);
379            dlq.expect_record_failure().times(2).returning(|_| Ok(()));
380            processor.process_pending_events(Arc::new(dlq)).await
381        };
382
383        assert!(matches!(result, Ok(2)));
384    }
385
386    #[rstest]
387    #[tokio::test]
388    async fn fetch_error_propagates_without_publishing() {
389        let mut storage = MockOutboxStorage::<TestEvent>::new();
390        let mut transport = MockTransport::<TestEvent>::new();
391
392        storage
393            .expect_fetch_next_to_process()
394            .times(1)
395            .returning(|_| Err(OutboxError::DatabaseError("boom".into())));
396        storage.expect_update_status().times(0);
397        transport.expect_publish().times(0);
398
399        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
400
401        #[cfg(not(feature = "dlq"))]
402        let result = processor.process_pending_events().await;
403
404        #[cfg(feature = "dlq")]
405        let result = {
406            let mut dlq = MockDlqHeap::new();
407            dlq.expect_record_success().times(0);
408            dlq.expect_record_failure().times(0);
409            processor.process_pending_events(Arc::new(dlq)).await
410        };
411
412        assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
413    }
414
415    #[rstest]
416    #[tokio::test]
417    async fn update_status_error_propagates_after_publish() {
418        let mut storage = MockOutboxStorage::<TestEvent>::new();
419        let mut transport = MockTransport::<TestEvent>::new();
420
421        storage
422            .expect_fetch_next_to_process()
423            .times(1)
424            .returning(move |_| Ok(vec![make_event(1)]));
425        storage
426            .expect_update_status()
427            .times(1)
428            .returning(|_, _| Err(OutboxError::DatabaseError("boom".into())));
429
430        transport.expect_publish().times(1).returning(|_| Ok(()));
431
432        let processor = OutboxProcessor::new(Arc::new(storage), Arc::new(transport), config());
433
434        #[cfg(not(feature = "dlq"))]
435        let result = processor.process_pending_events().await;
436
437        #[cfg(feature = "dlq")]
438        let result = {
439            let mut dlq = MockDlqHeap::new();
440            dlq.expect_record_success().times(1).returning(|_| Ok(()));
441            processor.process_pending_events(Arc::new(dlq)).await
442        };
443
444        assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
445    }
446}