1use crate::config::{OutboxConfig};
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::{EventId};
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use std::sync::Arc;
9use tracing::error;
10
11pub struct OutboxProcessor<S, T>
12{
13 storage: S,
14 publisher: T,
15 config: Arc<OutboxConfig>,
16}
17
18impl<S, T> OutboxProcessor<S, T>
19where
20 S: OutboxStorage + Clone + 'static,
21 T: Transport + Clone + 'static,
22{
23
24 pub fn new(storage: S, publisher: T, config: Arc<OutboxConfig>) -> Self {
25 Self {
26 storage,
27 publisher,
28 config,
29 }
30 }
31
32 pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
33 let events = self.storage.fetch_next_to_process(self.config.batch_size).await?;
34
35 if events.is_empty() {
36 return Ok(0);
37 }
38 let count = events.len();
39 self.event_publish(events).await?;
40 Ok(count)
41 }
42
43 async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
44 let mut success_ids = Vec::<EventId>::new();
45 for event in events {
46 match self.publisher.publish(event.event_type, event.payload).await {
47 Ok(()) => {
48 success_ids.push(event.id);
49 }
50 Err(e) => {
51 error!("Failed to publish event {:?}: {:?}", event.id, e);
52 }
53 }
54 }
55 if !success_ids.is_empty() {
56 self.storage.updates_status(&success_ids, Sent).await?;
57 }
58 Ok(())
59 }
60
61}
62
63
64#[cfg(test)]
65mod tests {}