1use crate::config::OutboxConfig;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::model::EventStatus::Sent;
5use crate::object::EventId;
6use crate::publisher::Transport;
7use crate::storage::OutboxStorage;
8use serde::Serialize;
9use std::fmt::Debug;
10use std::sync::Arc;
11use tracing::error;
12
13pub struct OutboxProcessor<S, T, P>
14where
15 P: Debug + Clone + Serialize,
16{
17 storage: Arc<S>,
18 publisher: Arc<T>,
19 config: Arc<OutboxConfig<P>>,
20}
21
22impl<S, T, P> OutboxProcessor<S, T, P>
23where
24 S: OutboxStorage<P> + 'static,
25 T: Transport<P> + 'static,
26 P: Debug + Clone + Serialize + Send + Sync,
27{
28 pub fn new(storage: Arc<S>, publisher: Arc<T>, config: Arc<OutboxConfig<P>>) -> Self {
29 Self {
30 storage,
31 publisher,
32 config,
33 }
34 }
35
36 pub async fn process_pending_events(&self) -> Result<usize, OutboxError> {
40 let events: Vec<Event<P>> = self
41 .storage
42 .fetch_next_to_process(self.config.batch_size)
43 .await?;
44
45 if events.is_empty() {
46 return Ok(0);
47 }
48 let count = events.len();
49 self.event_publish(events).await?;
50 Ok(count)
51 }
52
53 async fn event_publish(&self, events: Vec<Event<P>>) -> Result<(), OutboxError> {
54 let mut success_ids = Vec::<EventId>::new();
55 for event in events {
56 let id = event.id;
57 match self.publisher.publish(event).await {
58 Ok(()) => {
59 success_ids.push(id);
60 }
61 Err(e) => {
62 error!("Failed to publish event {:?}: {:?}", id, e);
63 }
64 }
65 }
66 if !success_ids.is_empty() {
67 self.storage.updates_status(&success_ids, Sent).await?;
68 }
69 Ok(())
70 }
71}
72
73#[cfg(test)]
74mod tests {}