1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::EventId;
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use std::sync::Arc;
9use tracing::error;
10
11pub struct OutboxProcessor<S, T> {
12 storage: Arc<S>,
13 publisher: Arc<T>,
14 config: Arc<OutboxConfig>,
15}
16
17impl<S, T> OutboxProcessor<S, T>
18where
19 S: OutboxStorage + 'static,
20 T: Transport + 'static,
21{
22 pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig>) -> Self {
23 Self {
24 storage,
25 publisher,
26 config,
27 }
28 }
29
30 pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
34 let events = self
35 .storage
36 .fetch_next_to_process(self.config.batch_size)
37 .await?;
38
39 if events.is_empty() {
40 return Ok(0);
41 }
42 let count = events.len();
43 self.event_publish(events).await?;
44 Ok(count)
45 }
46
47 async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
48 let mut success_ids = Vec::<EventId>::new();
49 for event in events {
50 let id = event.id;
51 match self.publisher.publish(event).await {
52 Ok(()) => {
53 success_ids.push(id);
54 }
55 Err(e) => {
56 error!("Failed to publish event {:?}: {:?}", id, e);
57 }
58 }
59 }
60 if !success_ids.is_empty() {
61 self.storage.updates_status(&success_ids, Sent).await?;
62 }
63 Ok(())
64 }
65}
66
67#[cfg(test)]
68mod tests {}