mako_engine/outbox.rs
1//! Outbox pattern for reliable at-least-once outbound message delivery.
2//!
3//! # Why the outbox?
4//!
5//! When a process transition generates an outbound EDIFACT message (e.g. an
6//! APERAK acknowledgement), two writes must happen atomically:
7//!
8//! 1. Domain events are appended to the event store.
9//! 2. The EDIFACT payload is queued for delivery to the AS4 endpoint.
10//!
11//! Without the outbox, a crash between steps 1 and 2 silently loses the
12//! outbound message. With the outbox, both writes are part of the same
13//! database transaction — a background delivery worker then delivers pending
14//! messages, surviving crashes and transient AS4 failures transparently.
15//!
16//! # Usage
17//!
18//! ```rust,ignore
19//! // After a command dispatch that should trigger an outbound APERAK:
20//! let env = &aperak_envelopes[0];
21//! let msg = OutboxMessage::new(
22//! process.stream_id().clone(),
23//! env.process_id,
24//! env.tenant_id,
25//! env.correlation_id,
26//! env.conversation_id,
27//! env.event_id,
28//! "APERAK",
29//! &recipient_gln,
30//! aperak_payload_json,
31//! );
32//! outbox_store.enqueue(&[msg]).await?;
33//!
34//! // Background delivery worker:
35//! let pending = outbox_store.pending_now(50).await?;
36//! for msg in pending {
37//! as4_client.send(&msg).await?;
38//! outbox_store.acknowledge(msg.message_id).await?;
39//! }
40//! ```
41//!
42//! # Atomicity contract
43//!
44//! [`InMemoryOutboxStore`] does **not** guarantee transactional atomicity
45//! with [`InMemoryEventStore`]. Persistent backend crates
46//! (`mako-event-store-slatedb`, `mako-event-store-postgres`) MUST enqueue
47//! messages in the same database transaction as the event append.
48//!
49//! [`InMemoryEventStore`]: crate::event_store::InMemoryEventStore
50
51use std::sync::Arc;
52
53#[cfg(any(test, feature = "testing"))]
54use std::collections::HashMap;
55#[cfg(any(test, feature = "testing"))]
56use tokio::sync::RwLock;
57
58use time::OffsetDateTime;
59
60use crate::{
61 error::EngineError,
62 ids::{ConversationId, CorrelationId, EventId, OutboxMessageId, ProcessId, StreamId, TenantId},
63};
64
65// ── PendingOutbox ─────────────────────────────────────────────────────────────
66
67/// A lightweight outbox message specification produced by [`Workflow::handle`].
68///
69/// [`Workflow::handle`] is a pure function: it cannot know the store-assigned
70/// fields (`event_id`, `stream_id`, `process_id`, etc.) of the events it is
71/// about to emit. `PendingOutbox` carries only the information the domain
72/// workflow can produce deterministically, without I/O or clock access.
73///
74/// The engine fills in the store-assigned fields after the event append
75/// succeeds, converting `PendingOutbox` into a fully materialised
76/// [`OutboxMessage`] inside [`SlateDbStore::append_with_outbox`].
77///
78/// # Example
79///
80/// ```rust,ignore
81/// // Inside Workflow::handle, when DispatchAperak succeeds:
82/// let outbox = vec![
83/// PendingOutbox::new("APERAK", &state.sender_party_id().to_string(), aperak_payload)
84/// .caused_by(0), // caused by the first event in this batch
85/// ];
86/// Ok(WorkflowOutput { events, outbox })
87/// ```
88///
89/// [`Workflow::handle`]: crate::workflow::Workflow::handle
90/// [`SlateDbStore::append_with_outbox`]: crate::event_store::AtomicAppend::append_with_outbox
91#[derive(Debug, Clone)]
92pub struct PendingOutbox {
93 /// EDIFACT or XML message type (e.g. `"APERAK"`, `"CONTRL"`, `"REMADV"`).
94 pub message_type: Box<str>,
95 /// GLN or EIC code of the intended recipient market participant.
96 pub recipient: Box<str>,
97 /// Domain-level message payload (JSON).
98 ///
99 /// Typically encodes the intent (e.g. positive/negative APERAK reason)
100 /// rather than the final EDIFACT bytes. The delivery worker or AS4
101 /// gateway is responsible for rendering the final wire format.
102 pub payload: serde_json::Value,
103 /// Do not deliver before this time.
104 ///
105 /// `None` means deliver immediately (as soon as the delivery worker runs).
106 /// Must not use the wall clock inside `handle` — derive from domain data
107 /// only (e.g. a schedule date carried in the command).
108 pub deliver_after: Option<OffsetDateTime>,
109 /// BO4E JSON Schema URL that describes the `payload` shape.
110 ///
111 /// Set this to the canonical BO4E schema URL when the payload is a
112 /// BO4E-typed object (e.g. `Marktlokation`, `Messlokation`). Leave
113 /// `None` for raw EDIFACT or untyped payloads.
114 ///
115 /// Example:
116 /// `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Marktlokation.json"`
117 pub payload_schema: Option<Box<str>>,
118 /// Zero-based index into the concurrent events batch that caused this
119 /// outbound message.
120 ///
121 /// Used by the engine to set `causation_event_id` on the materialised
122 /// [`OutboxMessage`] from the stamped [`EventEnvelope`] at the same index.
123 /// Clamped to `events.len() - 1` when out-of-range.
124 ///
125 /// [`EventEnvelope`]: crate::envelope::EventEnvelope
126 pub caused_by_event_index: usize,
127}
128
129impl PendingOutbox {
130 /// Construct a pending outbox message for immediate delivery.
131 ///
132 /// `caused_by_event_index` defaults to `0` (first event in the batch).
133 /// Chain [`caused_by`] to change it.
134 ///
135 /// [`caused_by`]: PendingOutbox::caused_by
136 #[must_use]
137 pub fn new(
138 message_type: impl Into<Box<str>>,
139 recipient: impl Into<Box<str>>,
140 payload: serde_json::Value,
141 ) -> Self {
142 Self {
143 message_type: message_type.into(),
144 recipient: recipient.into(),
145 payload,
146 deliver_after: None,
147 payload_schema: None,
148 caused_by_event_index: 0,
149 }
150 }
151
152 /// Set the zero-based index of the event that caused this outbox message.
153 #[must_use]
154 pub fn caused_by(mut self, index: usize) -> Self {
155 self.caused_by_event_index = index;
156 self
157 }
158
159 /// Set a deferred delivery time (must be derived from domain data, not
160 /// the wall clock, to preserve `Workflow::handle` purity).
161 #[must_use]
162 pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
163 self.deliver_after = Some(deliver_after);
164 self
165 }
166
167 /// Attach a BO4E JSON Schema URL to the payload.
168 ///
169 /// Use this when the payload is a BO4E-typed object so the ERP adapter
170 /// can deserialise it into the correct type without inspecting the JSON.
171 #[must_use]
172 pub fn with_schema(mut self, schema_url: &'static str) -> Self {
173 self.payload_schema = Some(schema_url.into());
174 self
175 }
176}
177
178// ── OutboxMessage ─────────────────────────────────────────────────────────────
179
180/// An outbound message queued for delivery via AS4 or another channel.
181///
182/// The message carries both routing information (`recipient`, `message_type`)
183/// and full correlation metadata so the delivery worker can trace every send
184/// back to the domain event that caused it.
185///
186/// Construct with [`OutboxMessage::new`] and optionally chain
187/// [`OutboxMessage::with_deliver_after`] for deferred delivery.
188#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
189pub struct OutboxMessage {
190 /// Stable unique identifier for this outbox entry.
191 pub message_id: OutboxMessageId,
192
193 /// The process stream that produced this outbound message.
194 pub stream_id: StreamId,
195
196 /// The MaKo process instance.
197 pub process_id: ProcessId,
198
199 /// The tenant sending this message.
200 pub tenant_id: TenantId,
201
202 /// Propagated correlation root from the triggering event.
203 pub correlation_id: CorrelationId,
204
205 /// Business conversation this message belongs to (e.g. UTILMD ↔ APERAK).
206 pub conversation_id: ConversationId,
207
208 /// The persisted event that directly caused this outbound message.
209 pub causation_event_id: EventId,
210
211 /// EDIFACT or XML message type (e.g. `"APERAK"`, `"CONTRL"`, `"UTILMD"`).
212 pub message_type: Box<str>,
213
214 /// GLN or EIC code of the intended recipient market participant.
215 pub recipient: Box<str>,
216
217 /// Serialized message payload.
218 ///
219 /// Typically a JSON-encoded string of EDIFACT bytes or a structured
220 /// JSON object for non-EDIFACT channels.
221 pub payload: serde_json::Value,
222
223 /// BO4E JSON Schema URL that validates `payload`, if present.
224 ///
225 /// `None` for raw EDIFACT or untyped payloads. Set by domain workflows
226 /// via [`PendingOutbox::with_schema`] when the payload is a BO4E object.
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub payload_schema: Option<Box<str>>,
229
230 /// When this entry was created.
231 pub created_at: OffsetDateTime,
232
233 /// Do not deliver before this time.
234 ///
235 /// `None` means deliver immediately (as soon as the delivery worker runs).
236 pub deliver_after: Option<OffsetDateTime>,
237
238 /// Number of delivery attempts so far. Starts at `0`, incremented by
239 /// [`OutboxStore::reschedule`].
240 pub attempt_count: u32,
241}
242
243impl OutboxMessage {
244 /// Construct a new outbox message.
245 ///
246 /// `message_id` and `created_at` are generated automatically.
247 /// `attempt_count` is initialized to `0`.
248 ///
249 /// Call [`OutboxMessage::with_deliver_after`] to schedule deferred
250 /// delivery.
251 #[allow(clippy::too_many_arguments)]
252 #[must_use]
253 pub fn new(
254 stream_id: StreamId,
255 process_id: ProcessId,
256 tenant_id: TenantId,
257 correlation_id: CorrelationId,
258 conversation_id: ConversationId,
259 causation_event_id: EventId,
260 message_type: impl Into<Box<str>>,
261 recipient: impl Into<Box<str>>,
262 payload: serde_json::Value,
263 ) -> Self {
264 Self {
265 message_id: OutboxMessageId::new(),
266 stream_id,
267 process_id,
268 tenant_id,
269 correlation_id,
270 conversation_id,
271 causation_event_id,
272 message_type: message_type.into(),
273 recipient: recipient.into(),
274 payload,
275 payload_schema: None,
276 created_at: OffsetDateTime::now_utc(),
277 deliver_after: None,
278 attempt_count: 0,
279 }
280 }
281
282 /// Set a deferred delivery time.
283 ///
284 /// The message will not appear in [`OutboxStore::pending`] results until
285 /// `now >= deliver_after`.
286 #[must_use]
287 pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
288 self.deliver_after = Some(deliver_after);
289 self
290 }
291}
292
293// ── OutboxStore ───────────────────────────────────────────────────────────────
294
295/// Storage contract for outbox messages.
296///
297/// ## Atomicity requirement
298///
299/// In production deployments, calls to [`OutboxStore::enqueue`] MUST be
300/// atomic with the corresponding [`EventStore::append`] — both writes MUST
301/// succeed or both MUST fail. Implement this by sharing the same database
302/// transaction across both operations.
303///
304/// ## Delivery worker contract
305///
306/// The delivery worker loop should:
307/// 1. Call [`OutboxStore::pending_now`] to retrieve ready messages.
308/// 2. Attempt delivery to the AS4 endpoint.
309/// 3. On success: call [`OutboxStore::acknowledge`] to remove the message.
310/// 4. On transient failure: call [`OutboxStore::reschedule`] with an
311/// exponential back-off delay.
312///
313/// ## Blanket `Arc` implementation
314///
315/// `Arc<S>` implements `OutboxStore` whenever `S: OutboxStore`, so you can
316/// share a store across a delivery worker and command handlers without
317/// additional wrapper types.
318///
319/// [`EventStore::append`]: crate::event_store::EventStore::append
320#[allow(async_fn_in_trait)]
321pub trait OutboxStore: Send + Sync {
322 /// Persist `messages` durably, ready for delivery.
323 ///
324 /// In a persistent backend this MUST be called within the same
325 /// transaction as the event append.
326 ///
327 /// # Errors
328 ///
329 /// Returns [`EngineError::Outbox`] on storage failure.
330 async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError>;
331
332 /// Return up to `limit` messages ready for delivery as of `now`.
333 ///
334 /// A message is ready when `deliver_after` is `None` or `<= now`.
335 /// Results are ordered **oldest-first** by `created_at`.
336 ///
337 /// # Errors
338 ///
339 /// Returns [`EngineError::Outbox`] on storage failure.
340 async fn pending(
341 &self,
342 limit: usize,
343 now: OffsetDateTime,
344 ) -> Result<Vec<OutboxMessage>, EngineError>;
345
346 /// Return up to `limit` messages ready for delivery right now.
347 ///
348 /// Convenience wrapper around [`OutboxStore::pending`] that uses
349 /// `OffsetDateTime::now_utc()` as the reference time.
350 ///
351 /// # Errors
352 ///
353 /// Returns [`EngineError::Outbox`] on storage failure.
354 async fn pending_now(&self, limit: usize) -> Result<Vec<OutboxMessage>, EngineError> {
355 self.pending(limit, OffsetDateTime::now_utc()).await
356 }
357
358 /// Remove a message from the outbox after successful delivery.
359 ///
360 /// Calling this with an unknown `id` is a no-op.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`EngineError::Outbox`] on storage failure.
365 async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError>;
366
367 /// Reschedule a message for a future delivery attempt.
368 ///
369 /// Implementations MUST increment `attempt_count` on the stored record.
370 /// Calling this with an unknown `id` is a no-op.
371 ///
372 /// # Errors
373 ///
374 /// Returns [`EngineError::Outbox`] on storage failure.
375 async fn reschedule(
376 &self,
377 id: OutboxMessageId,
378 deliver_after: OffsetDateTime,
379 ) -> Result<(), EngineError>;
380
381 /// Return the total number of messages currently in the outbox.
382 ///
383 /// # Errors
384 ///
385 /// Returns [`EngineError::Outbox`] on storage failure.
386 async fn len(&self) -> Result<usize, EngineError>;
387
388 /// Return `true` when the outbox contains no messages.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`EngineError::Outbox`] on storage failure.
393 async fn is_empty(&self) -> Result<bool, EngineError> {
394 Ok(self.len().await? == 0)
395 }
396}
397
398// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
399
400impl<S: OutboxStore> OutboxStore for Arc<S> {
401 async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
402 self.as_ref().enqueue(messages).await
403 }
404
405 async fn pending(
406 &self,
407 limit: usize,
408 now: OffsetDateTime,
409 ) -> Result<Vec<OutboxMessage>, EngineError> {
410 self.as_ref().pending(limit, now).await
411 }
412
413 async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
414 self.as_ref().acknowledge(id).await
415 }
416
417 async fn reschedule(
418 &self,
419 id: OutboxMessageId,
420 deliver_after: OffsetDateTime,
421 ) -> Result<(), EngineError> {
422 self.as_ref().reschedule(id, deliver_after).await
423 }
424
425 async fn len(&self) -> Result<usize, EngineError> {
426 self.as_ref().len().await
427 }
428}
429
430// ── NoopOutboxStore ───────────────────────────────────────────────────────────
431
432/// An [`OutboxStore`] that silently discards all messages.
433///
434/// Every `enqueue` succeeds without storing anything. `pending` always
435/// returns an empty list. Use this as the default when outbox delivery is
436/// managed elsewhere or not required.
437///
438/// # ⚠️ Data loss warning
439///
440/// `NoopOutboxStore` **discards every outbound message silently**. No APERAK,
441/// MSCONS, or UTILMD will ever be delivered to the AS4 endpoint. Do not use
442/// in production.
443///
444/// This type is available in all build configurations so it can serve as a
445/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
446/// (which wires this as the default) is only available with the `testing`
447/// feature or in `cfg(test)`. Production code must call
448/// [`EngineBuilder::with_stores`] instead.
449///
450/// [`EngineBuilder`]: crate::builder::EngineBuilder
451/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
452/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
453#[derive(Debug, Clone, Copy, Default)]
454#[must_use = "NoopOutboxStore discards all outbound messages silently — use a persistent OutboxStore in production"]
455pub struct NoopOutboxStore;
456
457#[cfg(any(test, feature = "testing"))]
458impl OutboxStore for NoopOutboxStore {
459 async fn enqueue(&self, _messages: &[OutboxMessage]) -> Result<(), EngineError> {
460 Ok(())
461 }
462
463 async fn pending(
464 &self,
465 _limit: usize,
466 _now: OffsetDateTime,
467 ) -> Result<Vec<OutboxMessage>, EngineError> {
468 Ok(Vec::new())
469 }
470
471 async fn acknowledge(&self, _id: OutboxMessageId) -> Result<(), EngineError> {
472 Ok(())
473 }
474
475 async fn reschedule(
476 &self,
477 _id: OutboxMessageId,
478 _deliver_after: OffsetDateTime,
479 ) -> Result<(), EngineError> {
480 Ok(())
481 }
482
483 async fn len(&self) -> Result<usize, EngineError> {
484 Ok(0)
485 }
486}
487
488// ── InMemoryOutboxStore ───────────────────────────────────────────────────────
489
490/// An in-memory [`OutboxStore`] for tests and development.
491///
492/// Backed by a `HashMap` protected by a `RwLock`. Cloning shares the
493/// underlying data via `Arc` — all clones see the same outbox state.
494///
495/// **Not production-safe.** Use this for:
496/// - Unit and integration tests
497/// - Local development and examples
498/// - Verifying the outbox delivery loop without an external message broker
499///
500/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
501#[cfg(any(test, feature = "testing"))]
502#[derive(Debug, Default, Clone)]
503pub struct InMemoryOutboxStore {
504 inner: Arc<RwLock<HashMap<OutboxMessageId, OutboxMessage>>>,
505}
506
507#[cfg(any(test, feature = "testing"))]
508impl InMemoryOutboxStore {
509 /// Create an empty outbox store.
510 #[must_use]
511 pub fn new() -> Self {
512 Self::default()
513 }
514}
515
516#[cfg(any(test, feature = "testing"))]
517impl OutboxStore for InMemoryOutboxStore {
518 async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
519 let mut map = self.inner.write().await;
520 for msg in messages {
521 map.insert(msg.message_id, msg.clone());
522 }
523 Ok(())
524 }
525
526 async fn pending(
527 &self,
528 limit: usize,
529 now: OffsetDateTime,
530 ) -> Result<Vec<OutboxMessage>, EngineError> {
531 let map = self.inner.read().await;
532 let mut ready: Vec<_> = map
533 .values()
534 .filter(|m| m.deliver_after.is_none_or(|d| d <= now))
535 .cloned()
536 .collect();
537 // Stable ordering: oldest first so the delivery worker processes in
538 // creation order, preserving causal ordering across messages.
539 ready.sort_by_key(|m| m.created_at);
540 ready.truncate(limit);
541 Ok(ready)
542 }
543
544 async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
545 self.inner.write().await.remove(&id);
546 Ok(())
547 }
548
549 async fn reschedule(
550 &self,
551 id: OutboxMessageId,
552 deliver_after: OffsetDateTime,
553 ) -> Result<(), EngineError> {
554 let mut map = self.inner.write().await;
555 if let Some(msg) = map.get_mut(&id) {
556 msg.deliver_after = Some(deliver_after);
557 msg.attempt_count += 1;
558 }
559 Ok(())
560 }
561
562 async fn len(&self) -> Result<usize, EngineError> {
563 Ok(self.inner.read().await.len())
564 }
565}
566
567// ── Outbox idempotency key ────────────────────────────────────────────────────
568
569/// Compute a deterministic idempotency key for an outbound message.
570///
571/// The key is a UUID v5 (SHA-1 over a stable namespace) derived from the
572/// combination of process id, workflow step name, recipient partner id, and
573/// format version. Identical inputs always produce the same UUID.
574///
575/// # Usage
576///
577/// Store the key alongside the outbox entry and use it as a unique constraint
578/// in persistent backends so that re-dispatching the same command (e.g. after
579/// a retry) does not produce duplicate outbound messages:
580///
581/// ```rust
582/// use mako_engine::outbox::outbox_idempotency_key;
583/// use mako_engine::ids::ProcessId;
584///
585/// let process_id = ProcessId::new();
586/// let key = outbox_idempotency_key(process_id, "DispatchAperak", "4012345000023", "FV2025-10-01");
587/// println!("idempotency key: {key}");
588/// ```
589///
590/// # Key derivation
591///
592/// The key is `UUID_v5(MAKO_ENGINE_OUTBOX_NS, "{process_id}|{step}|{partner}|{fv}")`.
593///
594/// `MAKO_ENGINE_OUTBOX_NS` is a fixed namespace UUID (RFC 4122 §4.3, SHA-1
595/// variant) that scopes all mako-engine outbox keys to avoid collisions with
596/// UUIDs from other namespaces.
597#[must_use]
598pub fn outbox_idempotency_key(
599 process_id: ProcessId,
600 step: &str,
601 recipient: &str,
602 format_version: &str,
603) -> uuid::Uuid {
604 // A fixed namespace UUID for mako-engine outbox keys.
605 // Generated once by uuid::Uuid::new_v4() and hardcoded for stability.
606 // Changing this constant invalidates all existing keys — treat as immutable.
607 const MAKO_ENGINE_OUTBOX_NS: uuid::Uuid = uuid::Uuid::from_bytes([
608 0xd4, 0x7a, 0x2c, 0x9e, 0x5b, 0x31, 0x47, 0xf2, 0x89, 0x0a, 0x1e, 0x6c, 0x8a, 0x3d, 0x5f,
609 0x04,
610 ]);
611 let name = format!("{process_id}|{step}|{recipient}|{format_version}");
612 uuid::Uuid::new_v5(&MAKO_ENGINE_OUTBOX_NS, name.as_bytes())
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use crate::ids::{ConversationId, CorrelationId, EventId, ProcessId, TenantId};
619
620 fn make_msg() -> OutboxMessage {
621 OutboxMessage::new(
622 StreamId::new("process/test"),
623 ProcessId::new(),
624 TenantId::new(),
625 CorrelationId::new(),
626 ConversationId::new(),
627 EventId::new(),
628 "APERAK",
629 "4012345000023",
630 serde_json::json!({"positive": true}),
631 )
632 }
633
634 #[tokio::test]
635 async fn enqueue_appears_in_pending() {
636 let store = InMemoryOutboxStore::new();
637 let msg = make_msg();
638 let id = msg.message_id;
639
640 store.enqueue(&[msg]).await.unwrap();
641
642 assert_eq!(store.len().await.unwrap(), 1);
643 let pending = store.pending_now(10).await.unwrap();
644 assert_eq!(pending.len(), 1);
645 assert_eq!(pending[0].message_id, id);
646 }
647
648 #[tokio::test]
649 async fn acknowledge_removes_message() {
650 let store = InMemoryOutboxStore::new();
651 let msg = make_msg();
652 let id = msg.message_id;
653
654 store.enqueue(&[msg]).await.unwrap();
655 store.acknowledge(id).await.unwrap();
656
657 assert!(store.is_empty().await.unwrap());
658 }
659
660 #[tokio::test]
661 async fn deferred_message_not_in_pending_yet() {
662 let store = InMemoryOutboxStore::new();
663 let future = OffsetDateTime::now_utc() + time::Duration::hours(1);
664 let msg = make_msg().with_deliver_after(future);
665
666 store.enqueue(&[msg]).await.unwrap();
667
668 let pending = store.pending_now(10).await.unwrap();
669 assert!(
670 pending.is_empty(),
671 "deferred message must not appear before its time"
672 );
673 }
674
675 #[tokio::test]
676 async fn deferred_message_appears_after_deadline() {
677 let store = InMemoryOutboxStore::new();
678 let past = OffsetDateTime::now_utc() - time::Duration::seconds(1);
679 let msg = make_msg().with_deliver_after(past);
680
681 store.enqueue(&[msg]).await.unwrap();
682
683 let pending = store.pending_now(10).await.unwrap();
684 assert_eq!(pending.len(), 1);
685 }
686
687 #[tokio::test]
688 async fn reschedule_increments_attempt_count() {
689 let store = InMemoryOutboxStore::new();
690 let msg = make_msg();
691 let id = msg.message_id;
692 let new_time = OffsetDateTime::now_utc() + time::Duration::minutes(5);
693
694 store.enqueue(&[msg]).await.unwrap();
695 store.reschedule(id, new_time).await.unwrap();
696
697 let inner = store.inner.read().await;
698 let stored = inner.get(&id).unwrap();
699 assert_eq!(stored.attempt_count, 1);
700 assert_eq!(stored.deliver_after, Some(new_time));
701 }
702
703 #[tokio::test]
704 async fn pending_ordered_oldest_first() {
705 let store = InMemoryOutboxStore::new();
706 store.enqueue(&[make_msg()]).await.unwrap();
707 store.enqueue(&[make_msg()]).await.unwrap();
708
709 let pending = store.pending_now(10).await.unwrap();
710 assert_eq!(pending.len(), 2);
711 assert!(pending[0].created_at <= pending[1].created_at);
712 }
713
714 #[test]
715 fn outbox_idempotency_key_is_stable_and_deterministic() {
716 let pid = ProcessId::new();
717 let step = "ReceiveAperak";
718 let partner = "4012345000023";
719 let fv = "FV2025-10-01";
720
721 let k1 = outbox_idempotency_key(pid, step, partner, fv);
722 let k2 = outbox_idempotency_key(pid, step, partner, fv);
723 assert_eq!(k1, k2, "same inputs must produce the same key");
724 assert_eq!(k1.to_string().len(), 36, "UUID string is 36 chars");
725
726 // Different step → different key.
727 let k3 = outbox_idempotency_key(pid, "ReceiveContrl", partner, fv);
728 assert_ne!(k1, k3, "different step must produce different key");
729
730 // Different FV → different key.
731 let k4 = outbox_idempotency_key(pid, step, partner, "FV2026-10-01");
732 assert_ne!(k1, k4, "different FV must produce different key");
733 }
734}