Skip to main content

outbox_core/
service.rs

1use std::sync::Arc;
2use crate::error::OutboxError;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7
8pub struct OutboxService<W, S> {
9    writer: W,
10    idempotency_storage: S,
11    config: Arc<OutboxConfig>,
12}
13
14impl<W, S> OutboxService<W, S>
15where
16    W: OutboxWriter + Send + Sync + 'static,
17    S: IdempotencyStorageProvider + Send + Sync + 'static,
18{
19
20    pub fn new(writer: W, idempotency_storage: S, config: Arc<OutboxConfig>) -> Self {
21        Self {
22            writer,
23            idempotency_storage,
24            config
25        }
26    }
27
28    pub async fn add_event<F>(
29        &self,
30        event_type: &str,
31        payload: serde_json::Value,
32        provided_token: Option<String>,
33        get_event: F,
34    ) -> Result<(), OutboxError>
35    where
36        F: FnOnce() -> Option<Event>,
37    {
38        let i_token = self
39            .config
40            .idempotency_strategy
41            .invoke(provided_token, get_event)
42            .map(IdempotencyToken::new);
43
44        if let Some(ref token) = i_token
45            && !self.idempotency_storage.try_reserve(token).await? {
46                return Err(OutboxError::DuplicateEvent);
47            }
48
49        let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
50        self.writer.insert_event(event).await
51    }
52}
53
54