use std::sync::Arc;
#[cfg(any(test, feature = "testing"))]
use std::collections::HashMap;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::RwLock;
use time::OffsetDateTime;
use crate::{
error::EngineError,
ids::{ConversationId, CorrelationId, EventId, OutboxMessageId, ProcessId, StreamId, TenantId},
};
#[derive(Debug, Clone)]
pub struct PendingOutbox {
pub message_type: Box<str>,
pub recipient: Box<str>,
pub payload: serde_json::Value,
pub deliver_after: Option<OffsetDateTime>,
pub payload_schema: Option<Box<str>>,
pub caused_by_event_index: usize,
}
impl PendingOutbox {
#[must_use]
pub fn new(
message_type: impl Into<Box<str>>,
recipient: impl Into<Box<str>>,
payload: serde_json::Value,
) -> Self {
Self {
message_type: message_type.into(),
recipient: recipient.into(),
payload,
deliver_after: None,
payload_schema: None,
caused_by_event_index: 0,
}
}
#[must_use]
pub fn caused_by(mut self, index: usize) -> Self {
self.caused_by_event_index = index;
self
}
#[must_use]
pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
self.deliver_after = Some(deliver_after);
self
}
#[must_use]
pub fn with_schema(mut self, schema_url: &'static str) -> Self {
self.payload_schema = Some(schema_url.into());
self
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct OutboxMessage {
pub message_id: OutboxMessageId,
pub stream_id: StreamId,
pub process_id: ProcessId,
pub tenant_id: TenantId,
pub correlation_id: CorrelationId,
pub conversation_id: ConversationId,
pub causation_event_id: EventId,
pub message_type: Box<str>,
pub recipient: Box<str>,
pub payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload_schema: Option<Box<str>>,
pub created_at: OffsetDateTime,
pub deliver_after: Option<OffsetDateTime>,
pub attempt_count: u32,
}
impl OutboxMessage {
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
stream_id: StreamId,
process_id: ProcessId,
tenant_id: TenantId,
correlation_id: CorrelationId,
conversation_id: ConversationId,
causation_event_id: EventId,
message_type: impl Into<Box<str>>,
recipient: impl Into<Box<str>>,
payload: serde_json::Value,
) -> Self {
Self {
message_id: OutboxMessageId::new(),
stream_id,
process_id,
tenant_id,
correlation_id,
conversation_id,
causation_event_id,
message_type: message_type.into(),
recipient: recipient.into(),
payload,
payload_schema: None,
created_at: OffsetDateTime::now_utc(),
deliver_after: None,
attempt_count: 0,
}
}
#[must_use]
pub fn with_deliver_after(mut self, deliver_after: OffsetDateTime) -> Self {
self.deliver_after = Some(deliver_after);
self
}
}
#[allow(async_fn_in_trait)]
pub trait OutboxStore: Send + Sync {
async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError>;
async fn pending(
&self,
limit: usize,
now: OffsetDateTime,
) -> Result<Vec<OutboxMessage>, EngineError>;
async fn pending_now(&self, limit: usize) -> Result<Vec<OutboxMessage>, EngineError> {
self.pending(limit, OffsetDateTime::now_utc()).await
}
async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError>;
async fn reschedule(
&self,
id: OutboxMessageId,
deliver_after: OffsetDateTime,
) -> Result<(), EngineError>;
async fn len(&self) -> Result<usize, EngineError>;
async fn is_empty(&self) -> Result<bool, EngineError> {
Ok(self.len().await? == 0)
}
}
impl<S: OutboxStore> OutboxStore for Arc<S> {
async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
self.as_ref().enqueue(messages).await
}
async fn pending(
&self,
limit: usize,
now: OffsetDateTime,
) -> Result<Vec<OutboxMessage>, EngineError> {
self.as_ref().pending(limit, now).await
}
async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
self.as_ref().acknowledge(id).await
}
async fn reschedule(
&self,
id: OutboxMessageId,
deliver_after: OffsetDateTime,
) -> Result<(), EngineError> {
self.as_ref().reschedule(id, deliver_after).await
}
async fn len(&self) -> Result<usize, EngineError> {
self.as_ref().len().await
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "NoopOutboxStore discards all outbound messages silently — use a persistent OutboxStore in production"]
pub struct NoopOutboxStore;
#[cfg(any(test, feature = "testing"))]
impl OutboxStore for NoopOutboxStore {
async fn enqueue(&self, _messages: &[OutboxMessage]) -> Result<(), EngineError> {
Ok(())
}
async fn pending(
&self,
_limit: usize,
_now: OffsetDateTime,
) -> Result<Vec<OutboxMessage>, EngineError> {
Ok(Vec::new())
}
async fn acknowledge(&self, _id: OutboxMessageId) -> Result<(), EngineError> {
Ok(())
}
async fn reschedule(
&self,
_id: OutboxMessageId,
_deliver_after: OffsetDateTime,
) -> Result<(), EngineError> {
Ok(())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(0)
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Default, Clone)]
pub struct InMemoryOutboxStore {
inner: Arc<RwLock<HashMap<OutboxMessageId, OutboxMessage>>>,
}
#[cfg(any(test, feature = "testing"))]
impl InMemoryOutboxStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[cfg(any(test, feature = "testing"))]
impl OutboxStore for InMemoryOutboxStore {
async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
let mut map = self.inner.write().await;
for msg in messages {
map.insert(msg.message_id, msg.clone());
}
Ok(())
}
async fn pending(
&self,
limit: usize,
now: OffsetDateTime,
) -> Result<Vec<OutboxMessage>, EngineError> {
let map = self.inner.read().await;
let mut ready: Vec<_> = map
.values()
.filter(|m| m.deliver_after.is_none_or(|d| d <= now))
.cloned()
.collect();
ready.sort_by_key(|m| m.created_at);
ready.truncate(limit);
Ok(ready)
}
async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
self.inner.write().await.remove(&id);
Ok(())
}
async fn reschedule(
&self,
id: OutboxMessageId,
deliver_after: OffsetDateTime,
) -> Result<(), EngineError> {
let mut map = self.inner.write().await;
if let Some(msg) = map.get_mut(&id) {
msg.deliver_after = Some(deliver_after);
msg.attempt_count += 1;
}
Ok(())
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(self.inner.read().await.len())
}
}
#[must_use]
pub fn outbox_idempotency_key(
process_id: ProcessId,
step: &str,
recipient: &str,
format_version: &str,
) -> uuid::Uuid {
const MAKO_ENGINE_OUTBOX_NS: uuid::Uuid = uuid::Uuid::from_bytes([
0xd4, 0x7a, 0x2c, 0x9e, 0x5b, 0x31, 0x47, 0xf2, 0x89, 0x0a, 0x1e, 0x6c, 0x8a, 0x3d, 0x5f,
0x04,
]);
let name = format!("{process_id}|{step}|{recipient}|{format_version}");
uuid::Uuid::new_v5(&MAKO_ENGINE_OUTBOX_NS, name.as_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ids::{ConversationId, CorrelationId, EventId, ProcessId, TenantId};
fn make_msg() -> OutboxMessage {
OutboxMessage::new(
StreamId::new("process/test"),
ProcessId::new(),
TenantId::new(),
CorrelationId::new(),
ConversationId::new(),
EventId::new(),
"APERAK",
"4012345000023",
serde_json::json!({"positive": true}),
)
}
#[tokio::test]
async fn enqueue_appears_in_pending() {
let store = InMemoryOutboxStore::new();
let msg = make_msg();
let id = msg.message_id;
store.enqueue(&[msg]).await.unwrap();
assert_eq!(store.len().await.unwrap(), 1);
let pending = store.pending_now(10).await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].message_id, id);
}
#[tokio::test]
async fn acknowledge_removes_message() {
let store = InMemoryOutboxStore::new();
let msg = make_msg();
let id = msg.message_id;
store.enqueue(&[msg]).await.unwrap();
store.acknowledge(id).await.unwrap();
assert!(store.is_empty().await.unwrap());
}
#[tokio::test]
async fn deferred_message_not_in_pending_yet() {
let store = InMemoryOutboxStore::new();
let future = OffsetDateTime::now_utc() + time::Duration::hours(1);
let msg = make_msg().with_deliver_after(future);
store.enqueue(&[msg]).await.unwrap();
let pending = store.pending_now(10).await.unwrap();
assert!(
pending.is_empty(),
"deferred message must not appear before its time"
);
}
#[tokio::test]
async fn deferred_message_appears_after_deadline() {
let store = InMemoryOutboxStore::new();
let past = OffsetDateTime::now_utc() - time::Duration::seconds(1);
let msg = make_msg().with_deliver_after(past);
store.enqueue(&[msg]).await.unwrap();
let pending = store.pending_now(10).await.unwrap();
assert_eq!(pending.len(), 1);
}
#[tokio::test]
async fn reschedule_increments_attempt_count() {
let store = InMemoryOutboxStore::new();
let msg = make_msg();
let id = msg.message_id;
let new_time = OffsetDateTime::now_utc() + time::Duration::minutes(5);
store.enqueue(&[msg]).await.unwrap();
store.reschedule(id, new_time).await.unwrap();
let inner = store.inner.read().await;
let stored = inner.get(&id).unwrap();
assert_eq!(stored.attempt_count, 1);
assert_eq!(stored.deliver_after, Some(new_time));
}
#[tokio::test]
async fn pending_ordered_oldest_first() {
let store = InMemoryOutboxStore::new();
store.enqueue(&[make_msg()]).await.unwrap();
store.enqueue(&[make_msg()]).await.unwrap();
let pending = store.pending_now(10).await.unwrap();
assert_eq!(pending.len(), 2);
assert!(pending[0].created_at <= pending[1].created_at);
}
#[test]
fn outbox_idempotency_key_is_stable_and_deterministic() {
let pid = ProcessId::new();
let step = "ReceiveAperak";
let partner = "4012345000023";
let fv = "FV2025-10-01";
let k1 = outbox_idempotency_key(pid, step, partner, fv);
let k2 = outbox_idempotency_key(pid, step, partner, fv);
assert_eq!(k1, k2, "same inputs must produce the same key");
assert_eq!(k1.to_string().len(), 36, "UUID string is 36 chars");
let k3 = outbox_idempotency_key(pid, "ReceiveContrl", partner, fv);
assert_ne!(k1, k3, "different step must produce different key");
let k4 = outbox_idempotency_key(pid, step, partner, "FV2026-10-01");
assert_ne!(k1, k4, "different FV must produce different key");
}
}