1use std::sync::Arc;
2use tracing::error;
3use crate::config::OutboxConfig;
4use crate::error::OutboxError;
5use crate::model::Event;
6use crate::model::EventStatus::Sent;
7use crate::object::EventId;
8use crate::publisher::Transport;
9use crate::storage::OutboxStorage;
10
11pub struct OutboxProcessor<S, P>
12where
13 S: OutboxStorage + Clone + 'static,
14 P: Transport + Clone + 'static,
15{
16 storage: S,
17 publisher: P,
18 config: Arc<OutboxConfig>,
19}
20
21impl<S, P> OutboxProcessor<S, P>
22where
23 S: OutboxStorage + Clone + 'static,
24 P: Transport + Clone + 'static,
25{
26
27 pub fn new(storage: S, publisher: P, config: Arc<OutboxConfig>) -> Self {
28 Self {
29 storage,
30 publisher,
31 config
32 }
33 }
34
35 pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
36 let events = self.storage.fetch_next_to_process(self.config.batch_size).await?;
37
38 if events.is_empty() {
39 return Ok(0);
40 }
41 let count = events.len();
42 self.event_publish(events).await?;
43 Ok(count)
44 }
45
46 async fn event_publish(&self, events: Vec<Event>) -> Result<(), OutboxError> {
47 let mut success_ids = Vec::<EventId>::new();
48 for event in events {
49 match self.publisher.publish(event.event_type, event.payload).await {
50 Ok(()) => {
51 success_ids.push(event.id);
52 }
53 Err(e) => {
54 error!("Failed to publish event {:?}: {:?}", event.id, e);
55 }
56 }
57 }
58 if !success_ids.is_empty() {
59 self.storage.updates_status(&success_ids, Sent).await?;
60 }
61 Ok(())
62 }
63
64}
65
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70}