Skip to main content

mako_engine/
outbox.rs

1//! Outbox pattern for reliable at-least-once outbound message delivery.
2//!
3//! # Why the outbox?
4//!
5//! When a process transition generates an outbound EDIFACT message (e.g. an
6//! APERAK acknowledgement), two writes must happen atomically:
7//!
8//! 1. Domain events are appended to the event store.
9//! 2. The EDIFACT payload is queued for delivery to the AS4 endpoint.
10//!
11//! Without the outbox, a crash between steps 1 and 2 silently loses the
12//! outbound message. With the outbox, both writes are part of the same
13//! database transaction — a background delivery worker then delivers pending
14//! messages, surviving crashes and transient AS4 failures transparently.
15//!
16//! # Usage
17//!
18//! ```rust,ignore
19//! // After a command dispatch that should trigger an outbound APERAK:
20//! let env = &aperak_envelopes[0];
21//! let msg = OutboxMessage::new(
22//!     process.stream_id().clone(),
23//!     env.process_id,
24//!     env.tenant_id,
25//!     env.correlation_id,
26//!     env.conversation_id,
27//!     env.event_id,
28//!     "APERAK",
29//!     &recipient_gln,
30//!     aperak_payload_json,
31//! );
32//! outbox_store.enqueue(&[msg]).await?;
33//!
34//! // Background delivery worker:
35//! let pending = outbox_store.pending_now(50).await?;
36//! for msg in pending {
37//!     as4_client.send(&msg).await?;
38//!     outbox_store.acknowledge(msg.message_id).await?;
39//! }
40//! ```
41//!
42//! # Atomicity contract
43//!
44//! [`InMemoryOutboxStore`] does **not** guarantee transactional atomicity
45//! with [`InMemoryEventStore`]. Persistent backend crates
46//! (`mako-event-store-slatedb`, `mako-event-store-postgres`) MUST enqueue
47//! messages in the same database transaction as the event append.
48//!
49//! [`InMemoryEventStore`]: crate::event_store::InMemoryEventStore
50
51use std::sync::Arc;
52
53#[cfg(any(test, feature = "testing"))]
54use std::collections::HashMap;
55#[cfg(any(test, feature = "testing"))]
56use tokio::sync::RwLock;
57
58use time::OffsetDateTime;
59
60use crate::{
61    error::EngineError,
62    ids::{ConversationId, CorrelationId, EventId, OutboxMessageId, ProcessId, StreamId, TenantId},
63};
64
65// ── PendingOutbox ─────────────────────────────────────────────────────────────
66
67/// A lightweight outbox message specification produced by [`Workflow::handle`].
68///
69/// [`Workflow::handle`] is a pure function: it cannot know the store-assigned
70/// fields (`event_id`, `stream_id`, `process_id`, etc.) of the events it is
71/// about to emit. `PendingOutbox` carries only the information the domain
72/// workflow can produce deterministically, without I/O or clock access.
73///
74/// The engine fills in the store-assigned fields after the event append
75/// succeeds, converting `PendingOutbox` into a fully materialised
76/// [`OutboxMessage`] inside [`SlateDbStore::append_with_outbox`].
77///
78/// # Example
79///
80/// ```rust,ignore
81/// // Inside Workflow::handle, when DispatchAperak succeeds:
82/// let outbox = vec![
83///     PendingOutbox::new("APERAK", &state.sender_party_id().to_string(), aperak_payload)
84///         .caused_by(0),  // caused by the first event in this batch
85/// ];
86/// Ok(WorkflowOutput { events, outbox })
87/// ```
88///
89/// [`Workflow::handle`]: crate::workflow::Workflow::handle
90/// [`SlateDbStore::append_with_outbox`]: crate::event_store::AtomicAppend::append_with_outbox
91#[derive(Debug, Clone)]
92pub struct PendingOutbox {
93    /// EDIFACT or XML message type (e.g. `"APERAK"`, `"CONTRL"`, `"REMADV"`).
94    pub message_type: Box<str>,
95    /// GLN or EIC code of the intended recipient market participant.
96    pub recipient: Box<str>,
97    /// Domain-level message payload (JSON).
98    ///
99    /// Typically encodes the intent (e.g. positive/negative APERAK reason)
100    /// rather than the final EDIFACT bytes. The delivery worker or AS4
101    /// gateway is responsible for rendering the final wire format.
102    pub payload: serde_json::Value,
103    /// Do not deliver before this time.
104    ///
105    /// `None` means deliver immediately (as soon as the delivery worker runs).
106    /// Must not use the wall clock inside `handle` — derive from domain data
107    /// only (e.g. a schedule date carried in the command).
108    pub deliver_after: Option<OffsetDateTime>,
109    /// BO4E JSON Schema URL that describes the `payload` shape.
110    ///
111    /// Set this to the canonical BO4E schema URL when the payload is a
112    /// BO4E-typed object (e.g. `Marktlokation`, `Messlokation`). Leave
113    /// `None` for raw EDIFACT or untyped payloads.
114    ///
115    /// Example:
116    /// `"https://raw.githubusercontent.com/BO4E/BO4E-Schemas/v202501.0.0/src/bo4e_schemas/bo/Marktlokation.json"`
117    pub payload_schema: Option<Box<str>>,
118    /// Zero-based index into the concurrent events batch that caused this
119    /// outbound message.
120    ///
121    /// Used by the engine to set `causation_event_id` on the materialised
122    /// [`OutboxMessage`] from the stamped [`EventEnvelope`] at the same index.
123    /// Clamped to `events.len() - 1` when out-of-range.
124    ///
125    /// [`EventEnvelope`]: crate::envelope::EventEnvelope
126    pub caused_by_event_index: usize,
127}
128
129impl PendingOutbox {
130    /// Construct a pending outbox message for immediate delivery.
131    ///
132    /// `caused_by_event_index` defaults to `0` (first event in the batch).
133    /// Chain [`caused_by`] to change it.
134    ///
135    /// [`caused_by`]: PendingOutbox::caused_by
136    #[must_use]
137    pub fn new(
138        message_type: impl Into<Box<str>>,
139        recipient: impl Into<Box<str>>,
140        payload: serde_json::Value,
141    ) -> Self {
142        Self {
143            message_type: message_type.into(),
144            recipient: recipient.into(),
145            payload,
146            deliver_after: None,
147            payload_schema: None,
148            caused_by_event_index: 0,
149        }
150    }
151
152    /// Set the zero-based index of the event that caused this outbox message.
153    #[must_use]
154    pub fn caused_by(mut self, index: usize) -> Self {
155        self.caused_by_event_index = index;
156        self
157    }
158
159    /// Set a deferred delivery time (must be derived from domain data, not
160    /// the wall clock, to preserve `Workflow::handle` purity).
161    #[must_use]
162    pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
163        self.deliver_after = Some(deliver_after);
164        self
165    }
166
167    /// Attach a BO4E JSON Schema URL to the payload.
168    ///
169    /// Use this when the payload is a BO4E-typed object so the ERP adapter
170    /// can deserialise it into the correct type without inspecting the JSON.
171    #[must_use]
172    pub fn with_schema(mut self, schema_url: &'static str) -> Self {
173        self.payload_schema = Some(schema_url.into());
174        self
175    }
176}
177
178// ── OutboxMessage ─────────────────────────────────────────────────────────────
179
180/// An outbound message queued for delivery via AS4 or another channel.
181///
182/// The message carries both routing information (`recipient`, `message_type`)
183/// and full correlation metadata so the delivery worker can trace every send
184/// back to the domain event that caused it.
185///
186/// Construct with [`OutboxMessage::new`] and optionally chain
187/// [`OutboxMessage::with_deliver_after`] for deferred delivery.
188#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
189pub struct OutboxMessage {
190    /// Stable unique identifier for this outbox entry.
191    pub message_id: OutboxMessageId,
192
193    /// The process stream that produced this outbound message.
194    pub stream_id: StreamId,
195
196    /// The MaKo process instance.
197    pub process_id: ProcessId,
198
199    /// The tenant sending this message.
200    pub tenant_id: TenantId,
201
202    /// Propagated correlation root from the triggering event.
203    pub correlation_id: CorrelationId,
204
205    /// Business conversation this message belongs to (e.g. UTILMD ↔ APERAK).
206    pub conversation_id: ConversationId,
207
208    /// The persisted event that directly caused this outbound message.
209    pub causation_event_id: EventId,
210
211    /// EDIFACT or XML message type (e.g. `"APERAK"`, `"CONTRL"`, `"UTILMD"`).
212    pub message_type: Box<str>,
213
214    /// GLN or EIC code of the intended recipient market participant.
215    pub recipient: Box<str>,
216
217    /// Serialized message payload.
218    ///
219    /// Typically a JSON-encoded string of EDIFACT bytes or a structured
220    /// JSON object for non-EDIFACT channels.
221    pub payload: serde_json::Value,
222
223    /// BO4E JSON Schema URL that validates `payload`, if present.
224    ///
225    /// `None` for raw EDIFACT or untyped payloads. Set by domain workflows
226    /// via [`PendingOutbox::with_schema`] when the payload is a BO4E object.
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub payload_schema: Option<Box<str>>,
229
230    /// When this entry was created.
231    pub created_at: OffsetDateTime,
232
233    /// Do not deliver before this time.
234    ///
235    /// `None` means deliver immediately (as soon as the delivery worker runs).
236    pub deliver_after: Option<OffsetDateTime>,
237
238    /// Number of delivery attempts so far. Starts at `0`, incremented by
239    /// [`OutboxStore::reschedule`].
240    pub attempt_count: u32,
241}
242
243impl OutboxMessage {
244    /// Construct a new outbox message.
245    ///
246    /// `message_id` and `created_at` are generated automatically.
247    /// `attempt_count` is initialized to `0`.
248    ///
249    /// Call [`OutboxMessage::with_deliver_after`] to schedule deferred
250    /// delivery.
251    #[allow(clippy::too_many_arguments)]
252    #[must_use]
253    pub fn new(
254        stream_id: StreamId,
255        process_id: ProcessId,
256        tenant_id: TenantId,
257        correlation_id: CorrelationId,
258        conversation_id: ConversationId,
259        causation_event_id: EventId,
260        message_type: impl Into<Box<str>>,
261        recipient: impl Into<Box<str>>,
262        payload: serde_json::Value,
263    ) -> Self {
264        Self {
265            message_id: OutboxMessageId::new(),
266            stream_id,
267            process_id,
268            tenant_id,
269            correlation_id,
270            conversation_id,
271            causation_event_id,
272            message_type: message_type.into(),
273            recipient: recipient.into(),
274            payload,
275            payload_schema: None,
276            created_at: OffsetDateTime::now_utc(),
277            deliver_after: None,
278            attempt_count: 0,
279        }
280    }
281
282    /// Set a deferred delivery time.
283    ///
284    /// The message will not appear in [`OutboxStore::pending`] results until
285    /// `now >= deliver_after`.
286    #[must_use]
287    pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
288        self.deliver_after = Some(deliver_after);
289        self
290    }
291}
292
293// ── OutboxStore ───────────────────────────────────────────────────────────────
294
295/// Storage contract for outbox messages.
296///
297/// ## Atomicity requirement
298///
299/// In production deployments, calls to [`OutboxStore::enqueue`] MUST be
300/// atomic with the corresponding [`EventStore::append`] — both writes MUST
301/// succeed or both MUST fail. Implement this by sharing the same database
302/// transaction across both operations.
303///
304/// ## Delivery worker contract
305///
306/// The delivery worker loop should:
307/// 1. Call [`OutboxStore::pending_now`] to retrieve ready messages.
308/// 2. Attempt delivery to the AS4 endpoint.
309/// 3. On success: call [`OutboxStore::acknowledge`] to remove the message.
310/// 4. On transient failure: call [`OutboxStore::reschedule`] with an
311///    exponential back-off delay.
312///
313/// ## Blanket `Arc` implementation
314///
315/// `Arc<S>` implements `OutboxStore` whenever `S: OutboxStore`, so you can
316/// share a store across a delivery worker and command handlers without
317/// additional wrapper types.
318///
319/// [`EventStore::append`]: crate::event_store::EventStore::append
320#[allow(async_fn_in_trait)]
321pub trait OutboxStore: Send + Sync {
322    /// Persist `messages` durably, ready for delivery.
323    ///
324    /// In a persistent backend this MUST be called within the same
325    /// transaction as the event append.
326    ///
327    /// # Errors
328    ///
329    /// Returns [`EngineError::Outbox`] on storage failure.
330    async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError>;
331
332    /// Return up to `limit` messages ready for delivery as of `now`.
333    ///
334    /// A message is ready when `deliver_after` is `None` or `<= now`.
335    /// Results are ordered **oldest-first** by `created_at`.
336    ///
337    /// # Errors
338    ///
339    /// Returns [`EngineError::Outbox`] on storage failure.
340    async fn pending(
341        &self,
342        limit: usize,
343        now: OffsetDateTime,
344    ) -> Result<Vec<OutboxMessage>, EngineError>;
345
346    /// Return up to `limit` messages ready for delivery right now.
347    ///
348    /// Convenience wrapper around [`OutboxStore::pending`] that uses
349    /// `OffsetDateTime::now_utc()` as the reference time.
350    ///
351    /// # Errors
352    ///
353    /// Returns [`EngineError::Outbox`] on storage failure.
354    async fn pending_now(&self, limit: usize) -> Result<Vec<OutboxMessage>, EngineError> {
355        self.pending(limit, OffsetDateTime::now_utc()).await
356    }
357
358    /// Remove a message from the outbox after successful delivery.
359    ///
360    /// Calling this with an unknown `id` is a no-op.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError::Outbox`] on storage failure.
365    async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError>;
366
367    /// Reschedule a message for a future delivery attempt.
368    ///
369    /// Implementations MUST increment `attempt_count` on the stored record.
370    /// Calling this with an unknown `id` is a no-op.
371    ///
372    /// # Errors
373    ///
374    /// Returns [`EngineError::Outbox`] on storage failure.
375    async fn reschedule(
376        &self,
377        id: OutboxMessageId,
378        deliver_after: OffsetDateTime,
379    ) -> Result<(), EngineError>;
380
381    /// Return the total number of messages currently in the outbox.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`EngineError::Outbox`] on storage failure.
386    async fn len(&self) -> Result<usize, EngineError>;
387
388    /// Return `true` when the outbox contains no messages.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`EngineError::Outbox`] on storage failure.
393    async fn is_empty(&self) -> Result<bool, EngineError> {
394        Ok(self.len().await? == 0)
395    }
396}
397
398// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
399
400impl<S: OutboxStore> OutboxStore for Arc<S> {
401    async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
402        self.as_ref().enqueue(messages).await
403    }
404
405    async fn pending(
406        &self,
407        limit: usize,
408        now: OffsetDateTime,
409    ) -> Result<Vec<OutboxMessage>, EngineError> {
410        self.as_ref().pending(limit, now).await
411    }
412
413    async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
414        self.as_ref().acknowledge(id).await
415    }
416
417    async fn reschedule(
418        &self,
419        id: OutboxMessageId,
420        deliver_after: OffsetDateTime,
421    ) -> Result<(), EngineError> {
422        self.as_ref().reschedule(id, deliver_after).await
423    }
424
425    async fn len(&self) -> Result<usize, EngineError> {
426        self.as_ref().len().await
427    }
428}
429
430// ── NoopOutboxStore ───────────────────────────────────────────────────────────
431
432/// An [`OutboxStore`] that silently discards all messages.
433///
434/// Every `enqueue` succeeds without storing anything. `pending` always
435/// returns an empty list. Use this as the default when outbox delivery is
436/// managed elsewhere or not required.
437///
438/// # ⚠️ Data loss warning
439///
440/// `NoopOutboxStore` **discards every outbound message silently**. No APERAK,
441/// MSCONS, or UTILMD will ever be delivered to the AS4 endpoint. Do not use
442/// in production.
443///
444/// This type is available in all build configurations so it can serve as a
445/// default type parameter in [`EngineBuilder`]. However, [`EngineBuilder::new`]
446/// (which wires this as the default) is only available with the `testing`
447/// feature or in `cfg(test)`. Production code must call
448/// [`EngineBuilder::with_stores`] instead.
449///
450/// [`EngineBuilder`]: crate::builder::EngineBuilder
451/// [`EngineBuilder::new`]: crate::builder::EngineBuilder::new
452/// [`EngineBuilder::with_stores`]: crate::builder::EngineBuilder::with_stores
453#[derive(Debug, Clone, Copy, Default)]
454#[must_use = "NoopOutboxStore discards all outbound messages silently — use a persistent OutboxStore in production"]
455pub struct NoopOutboxStore;
456
457#[cfg(any(test, feature = "testing"))]
458impl OutboxStore for NoopOutboxStore {
459    async fn enqueue(&self, _messages: &[OutboxMessage]) -> Result<(), EngineError> {
460        Ok(())
461    }
462
463    async fn pending(
464        &self,
465        _limit: usize,
466        _now: OffsetDateTime,
467    ) -> Result<Vec<OutboxMessage>, EngineError> {
468        Ok(Vec::new())
469    }
470
471    async fn acknowledge(&self, _id: OutboxMessageId) -> Result<(), EngineError> {
472        Ok(())
473    }
474
475    async fn reschedule(
476        &self,
477        _id: OutboxMessageId,
478        _deliver_after: OffsetDateTime,
479    ) -> Result<(), EngineError> {
480        Ok(())
481    }
482
483    async fn len(&self) -> Result<usize, EngineError> {
484        Ok(0)
485    }
486}
487
488// ── InMemoryOutboxStore ───────────────────────────────────────────────────────
489
490/// An in-memory [`OutboxStore`] for tests and development.
491///
492/// Backed by a `HashMap` protected by a `RwLock`. Cloning shares the
493/// underlying data via `Arc` — all clones see the same outbox state.
494///
495/// **Not production-safe.** Use this for:
496/// - Unit and integration tests
497/// - Local development and examples
498/// - Verifying the outbox delivery loop without an external message broker
499///
500/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
501#[cfg(any(test, feature = "testing"))]
502#[derive(Debug, Default, Clone)]
503pub struct InMemoryOutboxStore {
504    inner: Arc<RwLock<HashMap<OutboxMessageId, OutboxMessage>>>,
505}
506
507#[cfg(any(test, feature = "testing"))]
508impl InMemoryOutboxStore {
509    /// Create an empty outbox store.
510    #[must_use]
511    pub fn new() -> Self {
512        Self::default()
513    }
514}
515
516#[cfg(any(test, feature = "testing"))]
517impl OutboxStore for InMemoryOutboxStore {
518    async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
519        let mut map = self.inner.write().await;
520        for msg in messages {
521            map.insert(msg.message_id, msg.clone());
522        }
523        Ok(())
524    }
525
526    async fn pending(
527        &self,
528        limit: usize,
529        now: OffsetDateTime,
530    ) -> Result<Vec<OutboxMessage>, EngineError> {
531        let map = self.inner.read().await;
532        let mut ready: Vec<_> = map
533            .values()
534            .filter(|m| m.deliver_after.is_none_or(|d| d <= now))
535            .cloned()
536            .collect();
537        // Stable ordering: oldest first so the delivery worker processes in
538        // creation order, preserving causal ordering across messages.
539        ready.sort_by_key(|m| m.created_at);
540        ready.truncate(limit);
541        Ok(ready)
542    }
543
544    async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
545        self.inner.write().await.remove(&id);
546        Ok(())
547    }
548
549    async fn reschedule(
550        &self,
551        id: OutboxMessageId,
552        deliver_after: OffsetDateTime,
553    ) -> Result<(), EngineError> {
554        let mut map = self.inner.write().await;
555        if let Some(msg) = map.get_mut(&id) {
556            msg.deliver_after = Some(deliver_after);
557            msg.attempt_count += 1;
558        }
559        Ok(())
560    }
561
562    async fn len(&self) -> Result<usize, EngineError> {
563        Ok(self.inner.read().await.len())
564    }
565}
566
567// ── Outbox idempotency key ────────────────────────────────────────────────────
568
569/// Compute a deterministic idempotency key for an outbound message.
570///
571/// The key is a UUID v5 (SHA-1 over a stable namespace) derived from the
572/// combination of process id, workflow step name, recipient partner id, and
573/// format version. Identical inputs always produce the same UUID.
574///
575/// # Usage
576///
577/// Store the key alongside the outbox entry and use it as a unique constraint
578/// in persistent backends so that re-dispatching the same command (e.g. after
579/// a retry) does not produce duplicate outbound messages:
580///
581/// ```rust
582/// use mako_engine::outbox::outbox_idempotency_key;
583/// use mako_engine::ids::ProcessId;
584///
585/// let process_id = ProcessId::new();
586/// let key = outbox_idempotency_key(process_id, "DispatchAperak", "4012345000023", "FV2025-10-01");
587/// println!("idempotency key: {key}");
588/// ```
589///
590/// # Key derivation
591///
592/// The key is `UUID_v5(MAKO_ENGINE_OUTBOX_NS, "{process_id}|{step}|{partner}|{fv}")`.
593///
594/// `MAKO_ENGINE_OUTBOX_NS` is a fixed namespace UUID (RFC 4122 §4.3, SHA-1
595/// variant) that scopes all mako-engine outbox keys to avoid collisions with
596/// UUIDs from other namespaces.
597#[must_use]
598pub fn outbox_idempotency_key(
599    process_id: ProcessId,
600    step: &str,
601    recipient: &str,
602    format_version: &str,
603) -> uuid::Uuid {
604    // A fixed namespace UUID for mako-engine outbox keys.
605    // Generated once by uuid::Uuid::new_v4() and hardcoded for stability.
606    // Changing this constant invalidates all existing keys — treat as immutable.
607    const MAKO_ENGINE_OUTBOX_NS: uuid::Uuid = uuid::Uuid::from_bytes([
608        0xd4, 0x7a, 0x2c, 0x9e, 0x5b, 0x31, 0x47, 0xf2, 0x89, 0x0a, 0x1e, 0x6c, 0x8a, 0x3d, 0x5f,
609        0x04,
610    ]);
611    let name = format!("{process_id}|{step}|{recipient}|{format_version}");
612    uuid::Uuid::new_v5(&MAKO_ENGINE_OUTBOX_NS, name.as_bytes())
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use crate::ids::{ConversationId, CorrelationId, EventId, ProcessId, TenantId};
619
620    fn make_msg() -> OutboxMessage {
621        OutboxMessage::new(
622            StreamId::new("process/test"),
623            ProcessId::new(),
624            TenantId::new(),
625            CorrelationId::new(),
626            ConversationId::new(),
627            EventId::new(),
628            "APERAK",
629            "4012345000023",
630            serde_json::json!({"positive": true}),
631        )
632    }
633
634    #[tokio::test]
635    async fn enqueue_appears_in_pending() {
636        let store = InMemoryOutboxStore::new();
637        let msg = make_msg();
638        let id = msg.message_id;
639
640        store.enqueue(&[msg]).await.unwrap();
641
642        assert_eq!(store.len().await.unwrap(), 1);
643        let pending = store.pending_now(10).await.unwrap();
644        assert_eq!(pending.len(), 1);
645        assert_eq!(pending[0].message_id, id);
646    }
647
648    #[tokio::test]
649    async fn acknowledge_removes_message() {
650        let store = InMemoryOutboxStore::new();
651        let msg = make_msg();
652        let id = msg.message_id;
653
654        store.enqueue(&[msg]).await.unwrap();
655        store.acknowledge(id).await.unwrap();
656
657        assert!(store.is_empty().await.unwrap());
658    }
659
660    #[tokio::test]
661    async fn deferred_message_not_in_pending_yet() {
662        let store = InMemoryOutboxStore::new();
663        let future = OffsetDateTime::now_utc() + time::Duration::hours(1);
664        let msg = make_msg().with_deliver_after(future);
665
666        store.enqueue(&[msg]).await.unwrap();
667
668        let pending = store.pending_now(10).await.unwrap();
669        assert!(
670            pending.is_empty(),
671            "deferred message must not appear before its time"
672        );
673    }
674
675    #[tokio::test]
676    async fn deferred_message_appears_after_deadline() {
677        let store = InMemoryOutboxStore::new();
678        let past = OffsetDateTime::now_utc() - time::Duration::seconds(1);
679        let msg = make_msg().with_deliver_after(past);
680
681        store.enqueue(&[msg]).await.unwrap();
682
683        let pending = store.pending_now(10).await.unwrap();
684        assert_eq!(pending.len(), 1);
685    }
686
687    #[tokio::test]
688    async fn reschedule_increments_attempt_count() {
689        let store = InMemoryOutboxStore::new();
690        let msg = make_msg();
691        let id = msg.message_id;
692        let new_time = OffsetDateTime::now_utc() + time::Duration::minutes(5);
693
694        store.enqueue(&[msg]).await.unwrap();
695        store.reschedule(id, new_time).await.unwrap();
696
697        let inner = store.inner.read().await;
698        let stored = inner.get(&id).unwrap();
699        assert_eq!(stored.attempt_count, 1);
700        assert_eq!(stored.deliver_after, Some(new_time));
701    }
702
703    #[tokio::test]
704    async fn pending_ordered_oldest_first() {
705        let store = InMemoryOutboxStore::new();
706        store.enqueue(&[make_msg()]).await.unwrap();
707        store.enqueue(&[make_msg()]).await.unwrap();
708
709        let pending = store.pending_now(10).await.unwrap();
710        assert_eq!(pending.len(), 2);
711        assert!(pending[0].created_at <= pending[1].created_at);
712    }
713
714    #[test]
715    fn outbox_idempotency_key_is_stable_and_deterministic() {
716        let pid = ProcessId::new();
717        let step = "ReceiveAperak";
718        let partner = "4012345000023";
719        let fv = "FV2025-10-01";
720
721        let k1 = outbox_idempotency_key(pid, step, partner, fv);
722        let k2 = outbox_idempotency_key(pid, step, partner, fv);
723        assert_eq!(k1, k2, "same inputs must produce the same key");
724        assert_eq!(k1.to_string().len(), 36, "UUID string is 36 chars");
725
726        // Different step → different key.
727        let k3 = outbox_idempotency_key(pid, "ReceiveContrl", partner, fv);
728        assert_ne!(k1, k3, "different step must produce different key");
729
730        // Different FV → different key.
731        let k4 = outbox_idempotency_key(pid, step, partner, "FV2026-10-01");
732        assert_ne!(k1, k4, "different FV must produce different key");
733    }
734}