1use crate::error::OutboxError;
12use crate::idempotency::storage::NoIdempotency;
13use crate::model::Event;
14use crate::object::{EventType, IdempotencyToken, Payload};
15use crate::prelude::{IdempotencyStorageProvider, OutboxConfig};
16use crate::storage::OutboxWriter;
17use serde::Serialize;
18use std::fmt::Debug;
19use std::sync::Arc;
20
21pub struct OutboxService<W, S, P>
34where
35 P: Debug + Clone + Serialize + Send + Sync,
36{
37 writer: Arc<W>,
38 config: Arc<OutboxConfig<P>>,
39 idempotency_storage: Option<Arc<S>>,
40}
41
42impl<W, P> OutboxService<W, NoIdempotency, P>
43where
44 W: OutboxWriter<P> + Send + Sync + 'static,
45 P: Debug + Clone + Serialize + Send + Sync,
46{
47 pub fn new(writer: Arc<W>, config: Arc<OutboxConfig<P>>) -> Self {
56 Self {
57 writer,
58 config,
59 idempotency_storage: None,
60 }
61 }
62}
63
64impl<W, S, P> OutboxService<W, S, P>
65where
66 W: OutboxWriter<P> + Send + Sync + 'static,
67 S: IdempotencyStorageProvider + Send + Sync + 'static,
68 P: Debug + Clone + Serialize + Send + Sync,
69{
70 pub fn with_idempotency(
78 writer: Arc<W>,
79 config: Arc<OutboxConfig<P>>,
80 idempotency_storage: Arc<S>,
81 ) -> Self {
82 Self {
83 writer,
84 idempotency_storage: Some(idempotency_storage),
85 config,
86 }
87 }
88 pub async fn add_event<F>(
135 &self,
136 event_type: &str,
137 payload: P,
138 provided_token: Option<String>,
139 get_event: F,
140 ) -> Result<(), OutboxError>
141 where
142 F: FnOnce() -> Option<Event<P>>,
143 P: Debug + Clone + Serialize + Send + Sync,
144 {
145 let i_token = self
146 .config
147 .idempotency_strategy
148 .invoke(provided_token, get_event)
149 .map(IdempotencyToken::new);
150
151 if let Some(i_provider) = &self.idempotency_storage
152 && let Some(ref token) = i_token
153 && !i_provider.try_reserve(token).await?
154 {
155 return Err(OutboxError::DuplicateEvent);
156 }
157
158 let event = Event::new(EventType::new(event_type), Payload::new(payload), i_token);
159 self.writer.insert_event(event).await
160 }
161}
162
163#[cfg(test)]
164#[allow(clippy::unwrap_used)]
165mod tests {
166 use super::*;
167 use crate::config::IdempotencyStrategy;
168 use crate::idempotency::storage::MockIdempotencyStorageProvider;
169 use crate::storage::MockOutboxWriter;
170 use rstest::rstest;
171 use serde::Deserialize;
172
173 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174 struct TestPayload {
175 kind: String,
176 }
177
178 fn payload() -> TestPayload {
179 TestPayload { kind: "k".into() }
180 }
181
182 fn config_with(strategy: IdempotencyStrategy<TestPayload>) -> Arc<OutboxConfig<TestPayload>> {
183 Arc::new(OutboxConfig {
184 batch_size: 100,
185 retention_days: 7,
186 gc_interval_secs: 3600,
187 poll_interval_secs: 10,
188 lock_timeout_mins: 5,
189 idempotency_strategy: strategy,
190 dlq_threshold: 10,
191 dlq_interval_secs: 1,
192 })
193 }
194
195 #[rstest]
196 #[tokio::test]
197 async fn none_strategy_without_idempotency_storage_inserts_event_without_token() {
198 let mut writer = MockOutboxWriter::<TestPayload>::new();
199 writer
200 .expect_insert_event()
201 .withf(|e| e.idempotency_token.is_none() && e.event_type.as_str() == "t")
202 .times(1)
203 .returning(|_| Ok(()));
204
205 let service = OutboxService::new(Arc::new(writer), config_with(IdempotencyStrategy::None));
206 let result = service.add_event("t", payload(), None, || None).await;
207 assert!(result.is_ok());
208 }
209
210 #[rstest]
211 #[tokio::test]
212 async fn uuid_strategy_without_idempotency_storage_inserts_event_with_generated_token() {
213 let mut writer = MockOutboxWriter::<TestPayload>::new();
214 writer
215 .expect_insert_event()
216 .withf(|e| {
217 e.idempotency_token
218 .as_ref()
219 .is_some_and(|t| !t.as_str().is_empty())
220 })
221 .times(1)
222 .returning(|_| Ok(()));
223
224 let service = OutboxService::new(Arc::new(writer), config_with(IdempotencyStrategy::Uuid));
225 let result = service.add_event("t", payload(), None, || None).await;
226 assert!(result.is_ok());
227 }
228
229 #[rstest]
230 #[tokio::test]
231 async fn uuid_strategy_with_storage_reserves_same_token_as_inserted() {
232 let mut writer = MockOutboxWriter::<TestPayload>::new();
233 let mut idem = MockIdempotencyStorageProvider::new();
234
235 let reserved: Arc<std::sync::Mutex<Option<String>>> = Arc::new(std::sync::Mutex::new(None));
237 let reserved_r = reserved.clone();
238 let reserved_w = reserved.clone();
239
240 idem.expect_try_reserve().times(1).returning(move |tok| {
241 *reserved_r.lock().unwrap() = Some(tok.as_str().to_owned());
242 Ok(true)
243 });
244
245 writer
246 .expect_insert_event()
247 .withf(move |e| {
248 let captured = reserved_w.lock().unwrap().clone();
249 match (&e.idempotency_token, captured) {
250 (Some(t), Some(expected)) => t.as_str() == expected,
251 _ => false,
252 }
253 })
254 .times(1)
255 .returning(|_| Ok(()));
256
257 let service = OutboxService::with_idempotency(
258 Arc::new(writer),
259 config_with(IdempotencyStrategy::Uuid),
260 Arc::new(idem),
261 );
262 let result = service.add_event("t", payload(), None, || None).await;
263 assert!(result.is_ok());
264 }
265
266 #[rstest]
267 #[tokio::test]
268 async fn provided_some_passes_user_token_to_reserve_and_insert() {
269 let mut writer = MockOutboxWriter::<TestPayload>::new();
270 let mut idem = MockIdempotencyStorageProvider::new();
271
272 idem.expect_try_reserve()
273 .withf(|t| t.as_str() == "user-tok")
274 .times(1)
275 .returning(|_| Ok(true));
276
277 writer
278 .expect_insert_event()
279 .withf(|e| {
280 e.idempotency_token
281 .as_ref()
282 .is_some_and(|t| t.as_str() == "user-tok")
283 })
284 .times(1)
285 .returning(|_| Ok(()));
286
287 let service = OutboxService::with_idempotency(
288 Arc::new(writer),
289 config_with(IdempotencyStrategy::Provided),
290 Arc::new(idem),
291 );
292 let result = service
293 .add_event("t", payload(), Some("user-tok".to_string()), || None)
294 .await;
295 assert!(result.is_ok());
296 }
297
298 #[rstest]
299 #[tokio::test]
300 async fn provided_none_skips_reserve_and_inserts_without_token() {
301 let mut writer = MockOutboxWriter::<TestPayload>::new();
302 let mut idem = MockIdempotencyStorageProvider::new();
303
304 idem.expect_try_reserve().times(0);
305
306 writer
307 .expect_insert_event()
308 .withf(|e| e.idempotency_token.is_none())
309 .times(1)
310 .returning(|_| Ok(()));
311
312 let service = OutboxService::with_idempotency(
313 Arc::new(writer),
314 config_with(IdempotencyStrategy::Provided),
315 Arc::new(idem),
316 );
317 let result = service.add_event("t", payload(), None, || None).await;
318 assert!(result.is_ok());
319 }
320
321 #[rstest]
322 #[tokio::test]
323 async fn custom_strategy_uses_extractor_closure_for_token() {
324 fn derive(event: &Event<TestPayload>) -> String {
325 format!("derived:{}", event.payload.as_value().kind)
326 }
327
328 let mut writer = MockOutboxWriter::<TestPayload>::new();
329 let mut idem = MockIdempotencyStorageProvider::new();
330
331 idem.expect_try_reserve()
332 .withf(|t| t.as_str() == "derived:k")
333 .times(1)
334 .returning(|_| Ok(true));
335
336 writer
337 .expect_insert_event()
338 .withf(|e| {
339 e.idempotency_token
340 .as_ref()
341 .is_some_and(|t| t.as_str() == "derived:k")
342 })
343 .times(1)
344 .returning(|_| Ok(()));
345
346 let service = OutboxService::with_idempotency(
347 Arc::new(writer),
348 config_with(IdempotencyStrategy::Custom(derive)),
349 Arc::new(idem),
350 );
351 let result = service
352 .add_event("t", payload(), None, || {
353 Some(Event::new(
354 EventType::new("t"),
355 Payload::new(payload()),
356 None,
357 ))
358 })
359 .await;
360 assert!(result.is_ok());
361 }
362
363 #[rstest]
364 #[should_panic(expected = "Strategy is Custom, but no Event context provided")]
365 #[tokio::test]
366 async fn custom_strategy_panics_when_get_event_returns_none() {
367 fn derive(_: &Event<TestPayload>) -> String {
368 "x".into()
369 }
370 let writer = MockOutboxWriter::<TestPayload>::new();
371 let idem = MockIdempotencyStorageProvider::new();
372
373 let service = OutboxService::with_idempotency(
374 Arc::new(writer),
375 config_with(IdempotencyStrategy::Custom(derive)),
376 Arc::new(idem),
377 );
378 let _ = service.add_event("t", payload(), None, || None).await;
379 }
380
381 #[rstest]
382 #[tokio::test]
383 async fn duplicate_when_reserve_returns_false_and_insert_is_not_called() {
384 let mut writer = MockOutboxWriter::<TestPayload>::new();
385 let mut idem = MockIdempotencyStorageProvider::new();
386
387 idem.expect_try_reserve().times(1).returning(|_| Ok(false));
388 writer.expect_insert_event().times(0);
389
390 let service = OutboxService::with_idempotency(
391 Arc::new(writer),
392 config_with(IdempotencyStrategy::Provided),
393 Arc::new(idem),
394 );
395 let result = service
396 .add_event("t", payload(), Some("dup".into()), || None)
397 .await;
398 assert!(matches!(result, Err(OutboxError::DuplicateEvent)));
399 }
400
401 #[rstest]
402 #[tokio::test]
403 async fn reserve_error_propagates_and_insert_is_not_called() {
404 let mut writer = MockOutboxWriter::<TestPayload>::new();
405 let mut idem = MockIdempotencyStorageProvider::new();
406
407 idem.expect_try_reserve()
408 .times(1)
409 .returning(|_| Err(OutboxError::InfrastructureError("redis down".into())));
410 writer.expect_insert_event().times(0);
411
412 let service = OutboxService::with_idempotency(
413 Arc::new(writer),
414 config_with(IdempotencyStrategy::Uuid),
415 Arc::new(idem),
416 );
417 let result = service.add_event("t", payload(), None, || None).await;
418 assert!(matches!(result, Err(OutboxError::InfrastructureError(_))));
419 }
420
421 #[rstest]
422 #[tokio::test]
423 async fn insert_error_propagates_after_successful_reserve() {
424 let mut writer = MockOutboxWriter::<TestPayload>::new();
425 let mut idem = MockIdempotencyStorageProvider::new();
426
427 idem.expect_try_reserve().times(1).returning(|_| Ok(true));
428 writer
429 .expect_insert_event()
430 .times(1)
431 .returning(|_| Err(OutboxError::DatabaseError("pk conflict".into())));
432
433 let service = OutboxService::with_idempotency(
434 Arc::new(writer),
435 config_with(IdempotencyStrategy::Uuid),
436 Arc::new(idem),
437 );
438 let result = service.add_event("t", payload(), None, || None).await;
439 assert!(matches!(result, Err(OutboxError::DatabaseError(_))));
440 }
441}