1use std::sync::Arc;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7
8pub struct OutboxService<W, S> {
9 writer: W,
10 idempotency_storage: S,
11 config: Arc<OutboxConfig>,
12}
13
14impl<W, S> OutboxService<W, S>
15where
16 W: OutboxWriter + Send + Sync + 'static,
17 S: IdempotencyStorageProvider + Send + Sync + 'static,
18{
19
20 pub fn new(writer: W, idempotency_storage: S, config: Arc<OutboxConfig>) -> Self {
21 Self {
22 writer,
23 idempotency_storage,
24 config
25 }
26 }
27
28 pub async fn add_event<F>(
29 &self,
30 event_type: &str,
31 payload: serde_json::Value,
32 provided_token: Option<String>,
33 get_event: F,
34 ) -> Result<(), OutboxError>
35 where
36 F: FnOnce() -> Option<Event>,
37 {
38 let i_token = self
39 .config
40 .idempotency_strategy
41 .invoke(provided_token, get_event)
42 .map(IdempotencyToken::new);
43
44 if let Some(ref token) = i_token
45 && !self.idempotency_storage.try_reserve(token).await? {
46 return Err(OutboxError::DuplicateEvent);
47 }
48
49 let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
50 self.writer.insert_event(event).await
51 }
52}
53
54