Skip to main content

outbox_core/
service.rs

1//! Producer-side entry point: writes new events into the outbox.
2//!
3//! [`OutboxService`] is what application code calls when a domain action
4//! needs to emit an event. It applies the configured
5//! [`IdempotencyStrategy`](crate::config::IdempotencyStrategy) to produce (or
6//! accept) a token, optionally reserves that token through an external
7//! [`IdempotencyStorageProvider`] to reject duplicates, and then persists the
8//! event via an [`OutboxWriter`]. The worker side
9//! ([`OutboxManager`](crate::manager::OutboxManager)) picks it up later.
10
11use crate::error::OutboxError;
12use crate::idempotency::storage::NoIdempotency;
13use crate::model::Event;
14use crate::object::{EventType, IdempotencyToken, Payload};
15use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
16use crate::storage::OutboxWriter;
17use serde::Serialize;
18use std::fmt::Debug;
19use std::sync::Arc;
20
21/// Producer-side facade for writing outbox events.
22///
23/// The service is generic over:
24///
25/// - `W` — [`OutboxWriter`] implementation (persists the event row)
26/// - `S` — [`IdempotencyStorageProvider`] implementation used to reserve
27///   tokens; set to [`NoIdempotency`] when no external reservation is needed
28/// - `P` — the user's domain event payload type (`Debug + Clone + Serialize`)
29///
30/// Construct with [`new`](Self::new) when events should be written without an
31/// external idempotency check, or with
32/// [`with_idempotency`](Self::with_idempotency) to wire a reservation backend.
33pub struct OutboxService<W, S, P>
34where
35    P: Debug + Clone + Serialize + Send + Sync,
36{
37    writer: Arc<W>,
38    config: Arc<OutboxConfig<P>>,
39    idempotency_storage: Option<Arc<S>>,
40}
41
42impl<W, P> OutboxService<W, NoIdempotency, P>
43where
44    W: OutboxWriter<P> + Send + Sync + 'static,
45    P: Debug + Clone + Serialize + Send + Sync,
46{
47    /// Creates a service without any external idempotency reservation.
48    ///
49    /// Tokens are still produced according to `config.idempotency_strategy`
50    /// and written alongside the event, so downstream consumers can deduplicate
51    /// on their side, but no pre-insert uniqueness check is performed here.
52    /// Use [`with_idempotency`](OutboxService::with_idempotency) to attach a
53    /// reservation store (for example Redis) when at-producer deduplication
54    /// is required.
55    pub fn new(writer: Arc<W>, config: Arc<OutboxConfig<P>>) -> Self {
56        Self {
57            writer,
58            config,
59            idempotency_storage: None,
60        }
61    }
62}
63
64impl<W, S, P> OutboxService<W, S, P>
65where
66    W: OutboxWriter<P> + Send + Sync + 'static,
67    S: IdempotencyStorageProvider + Send + Sync + 'static,
68    P: Debug + Clone + Serialize + Send + Sync,
69{
70    /// Creates a service wired to an external idempotency reservation store.
71    ///
72    /// Before inserting the event, the service calls
73    /// [`IdempotencyStorageProvider::try_reserve`] with the token produced by
74    /// the configured strategy. If the reservation returns `false`, the insert
75    /// is skipped and [`OutboxError::DuplicateEvent`] is propagated to the
76    /// caller.
77    pub fn with_idempotency(
78        writer: Arc<W>,
79        config: Arc<OutboxConfig<P>>,
80        idempotency_storage: Arc<S>,
81    ) -> Self {
82        Self {
83            writer,
84            idempotency_storage: Some(idempotency_storage),
85            config,
86        }
87    }
88    /// Adds a new event to the outbox storage with idempotency checks.
89    ///
90    /// The token is derived from
91    /// [`IdempotencyStrategy`](crate::config::IdempotencyStrategy) on the
92    /// configured [`OutboxConfig`]:
93    ///
94    /// - `Provided` — uses `provided_token` as-is (`None` skips reservation).
95    /// - `Uuid` — generates a fresh UUID v7; `provided_token` is ignored.
96    /// - `Custom(fn)` — calls `get_event` and passes the resulting
97    ///   [`Event`] to the closure. The `get_event` callback is **only**
98    ///   invoked by this branch, so for other strategies callers can safely
99    ///   pass `|| None`.
100    /// - `None` — no token is produced and reservation is skipped.
101    ///
102    /// If an idempotency provider is configured and a token was produced, it
103    /// will first attempt to reserve the token to prevent duplicate processing.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`OutboxError::DuplicateEvent`] if the event token has already
108    /// been used. Returns any [`OutboxError`] variant propagated from the
109    /// reservation call or from the writer's `insert_event`.
110    ///
111    /// # Panics
112    ///
113    /// Panics if the idempotency strategy is set to `Custom`, but `get_event`
114    /// returns `None`.
115    ///
116    /// # Example
117    ///
118    /// ```ignore
119    /// use std::sync::Arc;
120    /// use outbox_core::prelude::*;
121    ///
122    /// # async fn emit(
123    /// #     service: OutboxService<impl OutboxWriter<MyEvent> + Send + Sync + 'static,
124    /// #                            NoIdempotency,
125    /// #                            MyEvent>,
126    /// #     payload: MyEvent,
127    /// # ) -> Result<(), OutboxError>
128    /// # where MyEvent: std::fmt::Debug + Clone + serde::Serialize + Send + Sync,
129    /// # {
130    /// // Uuid / None strategies — no event context needed.
131    /// service.add_event("order.created", payload, None, || None).await?;
132    /// # Ok(()) }
133    /// ```
134    pub async fn add_event<F>(
135        &self,
136        event_type: &str,
137        payload: P,
138        provided_token: Option<String>,
139        get_event: F,
140    ) -> Result<(), OutboxError>
141    where
142        F: FnOnce() -> Option<Event<P>>,
143        P: Debug + Clone + Serialize + Send + Sync,
144    {
145        let i_token = self
146            .config
147            .idempotency_strategy
148            .invoke(provided_token, get_event)
149            .map(IdempotencyToken::new);
150
151        if let Some(i_provider) = &self.idempotency_storage
152            && let Some(ref token) = i_token
153            && !i_provider.try_reserve(token).await?
154        {
155            return Err(OutboxError::DuplicateEvent);
156        }
157
158        let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
159        self.writer.insert_event(event).await
160    }
161}
162
163#[cfg(test)]
164#[allow(clippy::unwrap_used)]
165mod tests {
166    use super::*;
167    use crate::config::IdempotencyStrategy;
168    use crate::idempotency::storage::MockIdempotencyStorageProvider;
169    use crate::storage::MockOutboxWriter;
170    use rstest::rstest;
171    use serde::Deserialize;
172
173    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174    struct TestPayload {
175        kind: String,
176    }
177
178    fn payload() -> TestPayload {
179        TestPayload { kind: "k".into() }
180    }
181
182    fn config_with(strategy: IdempotencyStrategy<TestPayload>) -> Arc<OutboxConfig<TestPayload>> {
183        Arc::new(OutboxConfig {
184            batch_size: 100,
185            retention_days: 7,
186            gc_interval_secs: 3600,
187            poll_interval_secs: 10,
188            lock_timeout_mins: 5,
189            idempotency_strategy: strategy,
190            dlq_threshold: 10,
191            dlq_interval_secs: 1,
192        })
193    }
194
195    #[rstest]
196    #[tokio::test]
197    async fn none_strategy_without_idempotency_storage_inserts_event_without_token() {
198        let mut writer = MockOutboxWriter::<TestPayload>::new();
199        writer
200            .expect_insert_event()
201            .withf(|e| e.idempotency_token.is_none() && e.event_type.as_str() == "t")
202            .times(1)
203            .returning(|_| Ok(()));
204
205        let service = OutboxService::new(Arc::new(writer), config_with(IdempotencyStrategy::None));
206        let result = service.add_event("t", payload(), None, || None).await;
207        assert!(result.is_ok());
208    }
209
210    #[rstest]
211    #[tokio::test]
212    async fn uuid_strategy_without_idempotency_storage_inserts_event_with_generated_token() {
213        let mut writer = MockOutboxWriter::<TestPayload>::new();
214        writer
215            .expect_insert_event()
216            .withf(|e| {
217                e.idempotency_token
218                    .as_ref()
219                    .is_some_and(|t| !t.as_str().is_empty())
220            })
221            .times(1)
222            .returning(|_| Ok(()));
223
224        let service = OutboxService::new(Arc::new(writer), config_with(IdempotencyStrategy::Uuid));
225        let result = service.add_event("t", payload(), None, || None).await;
226        assert!(result.is_ok());
227    }
228
229    #[rstest]
230    #[tokio::test]
231    async fn uuid_strategy_with_storage_reserves_same_token_as_inserted() {
232        let mut writer = MockOutboxWriter::<TestPayload>::new();
233        let mut idem = MockIdempotencyStorageProvider::new();
234
235        // Захватим токен из reserve и убедимся, что тот же приедет в insert.
236        let reserved: Arc<std::sync::Mutex<Option<String>>> = Arc::new(std::sync::Mutex::new(None));
237        let reserved_r = reserved.clone();
238        let reserved_w = reserved.clone();
239
240        idem.expect_try_reserve().times(1).returning(move |tok| {
241            *reserved_r.lock().unwrap() = Some(tok.as_str().to_owned());
242            Ok(true)
243        });
244
245        writer
246            .expect_insert_event()
247            .withf(move |e| {
248                let captured = reserved_w.lock().unwrap().clone();
249                match (&e.idempotency_token, captured) {
250                    (Some(t), Some(expected)) => t.as_str() == expected,
251                    _ => false,
252                }
253            })
254            .times(1)
255            .returning(|_| Ok(()));
256
257        let service = OutboxService::with_idempotency(
258            Arc::new(writer),
259            config_with(IdempotencyStrategy::Uuid),
260            Arc::new(idem),
261        );
262        let result = service.add_event("t", payload(), None, || None).await;
263        assert!(result.is_ok());
264    }
265
266    #[rstest]
267    #[tokio::test]
268    async fn provided_some_passes_user_token_to_reserve_and_insert() {
269        let mut writer = MockOutboxWriter::<TestPayload>::new();
270        let mut idem = MockIdempotencyStorageProvider::new();
271
272        idem.expect_try_reserve()
273            .withf(|t| t.as_str() == "user-tok")
274            .times(1)
275            .returning(|_| Ok(true));
276
277        writer
278            .expect_insert_event()
279            .withf(|e| {
280                e.idempotency_token
281                    .as_ref()
282                    .is_some_and(|t| t.as_str() == "user-tok")
283            })
284            .times(1)
285            .returning(|_| Ok(()));
286
287        let service = OutboxService::with_idempotency(
288            Arc::new(writer),
289            config_with(IdempotencyStrategy::Provided),
290            Arc::new(idem),
291        );
292        let result = service
293            .add_event("t", payload(), Some("user-tok".to_string()), || None)
294            .await;
295        assert!(result.is_ok());
296    }
297
298    #[rstest]
299    #[tokio::test]
300    async fn provided_none_skips_reserve_and_inserts_without_token() {
301        let mut writer = MockOutboxWriter::<TestPayload>::new();
302        let mut idem = MockIdempotencyStorageProvider::new();
303
304        idem.expect_try_reserve().times(0);
305
306        writer
307            .expect_insert_event()
308            .withf(|e| e.idempotency_token.is_none())
309            .times(1)
310            .returning(|_| Ok(()));
311
312        let service = OutboxService::with_idempotency(
313            Arc::new(writer),
314            config_with(IdempotencyStrategy::Provided),
315            Arc::new(idem),
316        );
317        let result = service.add_event("t", payload(), None, || None).await;
318        assert!(result.is_ok());
319    }
320
321    #[rstest]
322    #[tokio::test]
323    async fn custom_strategy_uses_extractor_closure_for_token() {
324        fn derive(event: &Event<TestPayload>) -> String {
325            format!("derived:{}", event.payload.as_value().kind)
326        }
327
328        let mut writer = MockOutboxWriter::<TestPayload>::new();
329        let mut idem = MockIdempotencyStorageProvider::new();
330
331        idem.expect_try_reserve()
332            .withf(|t| t.as_str() == "derived:k")
333            .times(1)
334            .returning(|_| Ok(true));
335
336        writer
337            .expect_insert_event()
338            .withf(|e| {
339                e.idempotency_token
340                    .as_ref()
341                    .is_some_and(|t| t.as_str() == "derived:k")
342            })
343            .times(1)
344            .returning(|_| Ok(()));
345
346        let service = OutboxService::with_idempotency(
347            Arc::new(writer),
348            config_with(IdempotencyStrategy::Custom(derive)),
349            Arc::new(idem),
350        );
351        let result = service
352            .add_event("t", payload(), None, || {
353                Some(Event::new(
354                    EventType::new("t"),
355                    Payload::new(payload()),
356                    None,
357                ))
358            })
359            .await;
360        assert!(result.is_ok());
361    }
362
363    #[rstest]
364    #[should_panic(expected = "Strategy is Custom, but no Event context provided")]
365    #[tokio::test]
366    async fn custom_strategy_panics_when_get_event_returns_none() {
367        fn derive(_: &Event<TestPayload>) -> String {
368            "x".into()
369        }
370        let writer = MockOutboxWriter::<TestPayload>::new();
371        let idem = MockIdempotencyStorageProvider::new();
372
373        let service = OutboxService::with_idempotency(
374            Arc::new(writer),
375            config_with(IdempotencyStrategy::Custom(derive)),
376            Arc::new(idem),
377        );
378        let _ = service.add_event("t", payload(), None, || None).await;
379    }
380
381    #[rstest]
382    #[tokio::test]
383    async fn duplicate_when_reserve_returns_false_and_insert_is_not_called() {
384        let mut writer = MockOutboxWriter::<TestPayload>::new();
385        let mut idem = MockIdempotencyStorageProvider::new();
386
387        idem.expect_try_reserve().times(1).returning(|_| Ok(false));
388        writer.expect_insert_event().times(0);
389
390        let service = OutboxService::with_idempotency(
391            Arc::new(writer),
392            config_with(IdempotencyStrategy::Provided),
393            Arc::new(idem),
394        );
395        let result = service
396            .add_event("t", payload(), Some("dup".into()), || None)
397            .await;
398        assert!(matches!(result, Err(OutboxError::DuplicateEvent)));
399    }
400
401    #[rstest]
402    #[tokio::test]
403    async fn reserve_error_propagates_and_insert_is_not_called() {
404        let mut writer = MockOutboxWriter::<TestPayload>::new();
405        let mut idem = MockIdempotencyStorageProvider::new();
406
407        idem.expect_try_reserve()
408            .times(1)
409            .returning(|_| Err(OutboxError::InfrastructureError("redis down".into())));
410        writer.expect_insert_event().times(0);
411
412        let service = OutboxService::with_idempotency(
413            Arc::new(writer),
414            config_with(IdempotencyStrategy::Uuid),
415            Arc::new(idem),
416        );
417        let result = service.add_event("t", payload(), None, || None).await;
418        assert!(matches!(result, Err(OutboxError::InfrastructureError(_))));
419    }
420
421    #[rstest]
422    #[tokio::test]
423    async fn insert_error_propagates_after_successful_reserve() {
424        let mut writer = MockOutboxWriter::<TestPayload>::new();
425        let mut idem = MockIdempotencyStorageProvider::new();
426
427        idem.expect_try_reserve().times(1).returning(|_| Ok(true));
428        writer
429            .expect_insert_event()
430            .times(1)
431            .returning(|_| Err(OutboxError::DatabaseError("pk conflict".into())));
432
433        let service = OutboxService::with_idempotency(
434            Arc::new(writer),
435            config_with(IdempotencyStrategy::Uuid),
436            Arc::new(idem),
437        );
438        let result = service.add_event("t", payload(), None, || None).await;
439        assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
440    }
441}