Skip to main content

outbox_core/
processor.rs

1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::EventId;
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use serde::Serialize;
9use std::fmt::Debug;
10use std::sync::Arc;
11use tracing::error;
12
13pub struct OutboxProcessor<S, T, P>
14where
15    P: Debug + Clone + Serialize,
16{
17    storage: Arc<S>,
18    publisher: Arc<T>,
19    config: Arc<OutboxConfig<P>>,
20}
21
22impl<S, T, P> OutboxProcessor<S, T, P>
23where
24    S: OutboxStorage<P> + 'static,
25    T: Transport<P> + 'static,
26    P: Debug + Clone + Serialize + Send + Sync,
27{
28    pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig<P>>) -> Self {
29        Self {
30            storage,
31            publisher,
32            config,
33        }
34    }
35
36    /// We receive the event batch and send it to Transport
37    /// # Errors
38    /// We may get a DB error during fetch or UPDATE. publish errors are only logged.
39    pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
40        let events: Vec<Event<P>> = self
41            .storage
42            .fetch_next_to_process(self.config.batch_size)
43            .await?;
44
45        if events.is_empty() {
46            return Ok(0);
47        }
48        let count = events.len();
49        self.event_publish(events).await?;
50        Ok(count)
51    }
52
53    async fn event_publish(&self, events: Vec<Event<P>>) -> Result<(), OutboxError> {
54        let mut success_ids = Vec::<EventId>::new();
55        for event in events {
56            let id = event.id;
57            match self.publisher.publish(event).await {
58                Ok(()) => {
59                    success_ids.push(id);
60                }
61                Err(e) => {
62                    error!("Failed to publish event {:?}: {:?}", id, e);
63                }
64            }
65        }
66        if !success_ids.is_empty() {
67            self.storage.updates_status(&success_ids, Sent).await?;
68        }
69        Ok(())
70    }
71}
72
73#[cfg(test)]
74mod tests {}