1use crate::error::OutboxError;
2use crate::idempotency::storage::NoIdempotency;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7use std::sync::Arc;
8
9pub struct OutboxService<W, S> {
10 writer: Arc<W>,
11 config: Arc<OutboxConfig>,
12 idempotency_storage: Option<Arc<S>>,
13}
14
15impl<W> OutboxService<W, NoIdempotency>
16where
17 W: OutboxWriter + Send + Sync + 'static,
18{
19 pub fn new(writer: Arc<W>, config: Arc<OutboxConfig>) -> Self {
20 Self {
21 writer,
22 config,
23 idempotency_storage: None,
24 }
25 }
26}
27
28impl<W, S> OutboxService<W, S>
29where
30 W: OutboxWriter + Send + Sync + 'static,
31 S: IdempotencyStorageProvider + Send + Sync + 'static,
32{
33 pub fn with_idempotency(
34 writer: Arc<W>,
35 config: Arc<OutboxConfig>,
36 idempotency_storage: Arc<S>,
37 ) -> Self {
38 Self {
39 writer,
40 idempotency_storage: Some(idempotency_storage),
41 config,
42 }
43 }
44 pub async fn add_event<F>(
59 &self,
60 event_type: &str,
61 payload: serde_json::Value,
62 provided_token: Option<String>,
63 get_event: F,
64 ) -> Result<(), OutboxError>
65 where
66 F: FnOnce() -> Option<Event>,
67 {
68 let i_token = self
69 .config
70 .idempotency_strategy
71 .invoke(provided_token, get_event)
72 .map(IdempotencyToken::new);
73
74 if let Some(i_provider) = &self.idempotency_storage
75 && let Some(ref token) = i_token
76 && !i_provider.try_reserve(token).await?
77 {
78 return Err(OutboxError::DuplicateEvent);
79 }
80
81 let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
82 self.writer.insert_event(event).await
83 }
84}