Skip to main content

outbox_core/
processor.rs

1use std::sync::Arc;
2use tracing::error;
3use crate::config::OutboxConfig;
4use crate::error::OutboxError;
5use crate::model::Event;
6use crate::model::EventStatus::Sent;
7use crate::object::EventId;
8use crate::publisher::Transport;
9use crate::storage::OutboxStorage;
10
11pub struct OutboxProcessor<S, P>
12where
13    S: OutboxStorage + Clone + 'static,
14    P: Transport + Clone  + 'static,
15{
16    storage: S,
17    publisher: P,
18    config: Arc<OutboxConfig>,
19}
20
21impl<S, P> OutboxProcessor<S, P>
22where
23    S: OutboxStorage + Clone  + 'static,
24    P: Transport + Clone  + 'static,
25{
26
27    pub fn new(storage: S, publisher: P, config: Arc<OutboxConfig>) -> Self {
28        Self {
29            storage,
30            publisher,
31            config
32        }
33    }
34
35    pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
36        let events = self.storage.fetch_next_to_process(self.config.batch_size).await?;
37
38        if events.is_empty() {
39            return Ok(0);
40        }
41        let count = events.len();
42        self.event_publish(events).await?;
43        Ok(count)
44    }
45
46    async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
47        let mut success_ids = Vec::<EventId>::new();
48        for event in events {
49            match self.publisher.publish(event.event_type, event.payload).await {
50                Ok(()) => {
51                    success_ids.push(event.id);
52                }
53                Err(e) => {
54                    error!("Failed to publish event {:?}: {:?}", event.id, e);
55                }
56            }
57        }
58        if !success_ids.is_empty() {
59            self.storage.updates_status(&success_ids, Sent).await?;
60        }
61        Ok(())
62    }
63
64}
65
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70}