Skip to main content

outbox_core/
processor.rs

1use crate::config::{OutboxConfig};
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::{EventId};
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use std::sync::Arc;
9use tracing::error;
10
11pub struct OutboxProcessor<S, T>
12{
13    storage: S,
14    publisher: T,
15    config: Arc<OutboxConfig>,
16}
17
18impl<S, T> OutboxProcessor<S, T>
19where
20    S: OutboxStorage + Clone  + 'static,
21    T: Transport + Clone  + 'static,
22{
23
24    pub fn new(storage: S, publisher: T, config: Arc<OutboxConfig>) -> Self {
25        Self {
26            storage,
27            publisher,
28            config,
29        }
30    }
31
32    pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
33        let events = self.storage.fetch_next_to_process(self.config.batch_size).await?;
34
35        if events.is_empty() {
36            return Ok(0);
37        }
38        let count = events.len();
39        self.event_publish(events).await?;
40        Ok(count)
41    }
42
43    async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
44        let mut success_ids = Vec::<EventId>::new();
45        for event in events {
46            match self.publisher.publish(event.event_type, event.payload).await {
47                Ok(()) => {
48                    success_ids.push(event.id);
49                }
50                Err(e) => {
51                    error!("Failed to publish event {:?}: {:?}", event.id, e);
52                }
53            }
54        }
55        if !success_ids.is_empty() {
56            self.storage.updates_status(&success_ids, Sent).await?;
57        }
58        Ok(())
59    }
60
61}
62
63
64#[cfg(test)]
65mod tests {}