1use crate::error::OutboxError;
2use crate::idempotency::storage::NoIdempotency;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10
11pub struct OutboxService<W, S, P>
12where
13 P: Debug + Clone + Serialize + Send + Sync,
14{
15 writer: Arc<W>,
16 config: Arc<OutboxConfig<P>>,
17 idempotency_storage: Option<Arc<S>>,
18}
19
20impl<W, P> OutboxService<W, NoIdempotency, P>
21where
22 W: OutboxWriter<P> + Send + Sync + 'static,
23 P: Debug + Clone + Serialize + Send + Sync,
24{
25 pub fn new(writer: Arc<W>, config: Arc<OutboxConfig<P>>) -> Self {
26 Self {
27 writer,
28 config,
29 idempotency_storage: None,
30 }
31 }
32}
33
34impl<W, S, P> OutboxService<W, S, P>
35where
36 W: OutboxWriter<P> + Send + Sync + 'static,
37 S: IdempotencyStorageProvider + Send + Sync + 'static,
38 P: Debug + Clone + Serialize + Send + Sync,
39{
40 pub fn with_idempotency(
41 writer: Arc<W>,
42 config: Arc<OutboxConfig<P>>,
43 idempotency_storage: Arc<S>,
44 ) -> Self {
45 Self {
46 writer,
47 idempotency_storage: Some(idempotency_storage),
48 config,
49 }
50 }
51 pub async fn add_event<F>(
66 &self,
67 event_type: &str,
68 payload: P,
69 provided_token: Option<String>,
70 get_event: F,
71 ) -> Result<(), OutboxError>
72 where
73 F: FnOnce() -> Option<Event<P>>,
74 P: Debug + Clone + Serialize + Send + Sync,
75 {
76 let i_token = self
77 .config
78 .idempotency_strategy
79 .invoke(provided_token, get_event)
80 .map(IdempotencyToken::new);
81
82 if let Some(i_provider) = &self.idempotency_storage
83 && let Some(ref token) = i_token
84 && !i_provider.try_reserve(token).await?
85 {
86 return Err(OutboxError::DuplicateEvent);
87 }
88
89 let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
90 self.writer.insert_event(event).await
91 }
92}