Skip to main content

outbox_core/
service.rs

1use crate::error::OutboxError;
2use crate::idempotency::storage::NoIdempotency;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7use serde::Serialize;
8use std::fmt::Debug;
9use std::sync::Arc;
10
11pub struct OutboxService<W, S, P>
12where
13    P: Debug + Clone + Serialize + Send + Sync,
14{
15    writer: Arc<W>,
16    config: Arc<OutboxConfig<P>>,
17    idempotency_storage: Option<Arc<S>>,
18}
19
20impl<W, P> OutboxService<W, NoIdempotency, P>
21where
22    W: OutboxWriter<P> + Send + Sync + 'static,
23    P: Debug + Clone + Serialize + Send + Sync,
24{
25    pub fn new(writer: Arc<W>, config: Arc<OutboxConfig<P>>) -> Self {
26        Self {
27            writer,
28            config,
29            idempotency_storage: None,
30        }
31    }
32}
33
34impl<W, S, P> OutboxService<W, S, P>
35where
36    W: OutboxWriter<P> + Send + Sync + 'static,
37    S: IdempotencyStorageProvider + Send + Sync + 'static,
38    P: Debug + Clone + Serialize + Send + Sync,
39{
40    pub fn with_idempotency(
41        writer: Arc<W>,
42        config: Arc<OutboxConfig<P>>,
43        idempotency_storage: Arc<S>,
44    ) -> Self {
45        Self {
46            writer,
47            idempotency_storage: Some(idempotency_storage),
48            config,
49        }
50    }
51    /// Adds a new event to the outbox storage with idempotency checks.
52    ///
53    /// If an idempotency provider is configured and a token is generated,
54    /// it will first attempt to reserve the token to prevent duplicate processing.
55    ///
56    /// # Errors
57    ///
58    /// Returns [`OutboxError::DuplicateEvent`] if the event token has already been used.
59    /// Returns [`OutboxError`] if the idempotency storage fails or the database
60    /// insert operation fails.
61    ///
62    /// # Panics
63    ///
64    /// Panics if the idempotency strategy is set to `Custom`, but `get_event` returns `None`.
65    pub async fn add_event<F>(
66        &self,
67        event_type: &str,
68        payload: P,
69        provided_token: Option<String>,
70        get_event: F,
71    ) -> Result<(), OutboxError>
72    where
73        F: FnOnce() -> Option<Event<P>>,
74        P: Debug + Clone + Serialize + Send + Sync,
75    {
76        let i_token = self
77            .config
78            .idempotency_strategy
79            .invoke(provided_token, get_event)
80            .map(IdempotencyToken::new);
81
82        if let Some(i_provider) = &self.idempotency_storage
83            && let Some(ref token) = i_token
84            && !i_provider.try_reserve(token).await?
85        {
86            return Err(OutboxError::DuplicateEvent);
87        }
88
89        let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
90        self.writer.insert_event(event).await
91    }
92}