Skip to main content

outbox_core/
processor.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::EventId;
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use std::sync::Arc;
9use tracing::error;
10
11pub struct OutboxProcessor<S, T> {
12    storage: Arc<S>,
13    publisher: Arc<T>,
14    config: Arc<OutboxConfig>,
15}
16
17impl<S, T> OutboxProcessor<S, T>
18where
19    S: OutboxStorage + 'static,
20    T: Transport + 'static,
21{
22    pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig>) -> Self {
23        Self {
24            storage,
25            publisher,
26            config,
27        }
28    }
29
30    /// We receive the event batch and send it to Transport
31    /// # Errors
32    /// We may get a DB error during fetch or UPDATE. publish errors are only logged.
33    pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
34        let events = self
35            .storage
36            .fetch_next_to_process(self.config.batch_size)
37            .await?;
38
39        if events.is_empty() {
40            return Ok(0);
41        }
42        let count = events.len();
43        self.event_publish(events).await?;
44        Ok(count)
45    }
46
47    async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
48        let mut success_ids = Vec::<EventId>::new();
49        for event in events {
50            let id = event.id;
51            match self.publisher.publish(event).await {
52                Ok(()) => {
53                    success_ids.push(id);
54                }
55                Err(e) => {
56                    error!("Failed to publish event {:?}: {:?}", id, e);
57                }
58            }
59        }
60        if !success_ids.is_empty() {
61            self.storage.updates_status(&success_ids, Sent).await?;
62        }
63        Ok(())
64    }
65}
66
67#[cfg(test)]
68mod tests {}