Skip to main content

outbox_core/
service.rs

1use crate::error::OutboxError;
2use crate::idempotency::storage::NoIdempotency;
3use crate::model::Event;
4use crate::object::{EventType, IdempotencyToken, Payload};
5use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
6use crate::storage::OutboxWriter;
7use std::sync::Arc;
8
9pub struct OutboxService<W, S> {
10    writer: Arc<W>,
11    config: Arc<OutboxConfig>,
12    idempotency_storage: Option<Arc<S>>,
13}
14
15impl<W> OutboxService<W, NoIdempotency>
16where
17    W: OutboxWriter + Send + Sync + 'static,
18{
19    pub fn new(writer: Arc<W>, config: Arc<OutboxConfig>) -> Self {
20        Self {
21            writer,
22            config,
23            idempotency_storage: None,
24        }
25    }
26}
27
28impl<W, S> OutboxService<W, S>
29where
30    W: OutboxWriter + Send + Sync + 'static,
31    S: IdempotencyStorageProvider + Send + Sync + 'static,
32{
33    pub fn with_idempotency(
34        writer: Arc<W>,
35        config: Arc<OutboxConfig>,
36        idempotency_storage: Arc<S>,
37    ) -> Self {
38        Self {
39            writer,
40            idempotency_storage: Some(idempotency_storage),
41            config,
42        }
43    }
44    /// Adds a new event to the outbox storage with idempotency checks.
45    ///
46    /// If an idempotency provider is configured and a token is generated,
47    /// it will first attempt to reserve the token to prevent duplicate processing.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`OutboxError::DuplicateEvent`] if the event token has already been used.
52    /// Returns [`OutboxError`] if the idempotency storage fails or the database
53    /// insert operation fails.
54    ///
55    /// # Panics
56    ///
57    /// Panics if the idempotency strategy is set to `Custom`, but `get_event` returns `None`.
58    pub async fn add_event<F>(
59        &self,
60        event_type: &str,
61        payload: serde_json::Value,
62        provided_token: Option<String>,
63        get_event: F,
64    ) -> Result<(), OutboxError>
65    where
66        F: FnOnce() -> Option<Event>,
67    {
68        let i_token = self
69            .config
70            .idempotency_strategy
71            .invoke(provided_token, get_event)
72            .map(IdempotencyToken::new);
73
74        if let Some(i_provider) = &self.idempotency_storage
75            && let Some(ref token) = i_token
76            && !i_provider.try_reserve(token).await?
77        {
78            return Err(OutboxError::DuplicateEvent);
79        }
80
81        let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
82        self.writer.insert_event(event).await
83    }
84}