#[cfg(feature = "sql")]
pub mod sql;
#[cfg(feature = "mongodb")]
pub mod mongo;
#[cfg(feature = "sql")]
pub use sql::SqlOutbox;
#[cfg(feature = "mongodb")]
pub use mongo::MongoOutbox;
use async_trait::async_trait;
use klauthed_core::domain::{DomainEvent, EventEnvelope};
use klauthed_core::id::Id;
use klauthed_core::time::Timestamp;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use crate::error::DataError;
pub struct OutboxTag;
pub type OutboxId = Id<OutboxTag>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OutboxEntry {
pub id: OutboxId,
pub aggregate_type: String,
pub aggregate_id: String,
pub event_type: String,
pub sequence: u64,
pub payload: serde_json::Value,
pub occurred_at: Timestamp,
pub published: bool,
pub published_at: Option<Timestamp>,
}
impl OutboxEntry {
pub fn from_envelope<E>(envelope: &EventEnvelope<E>) -> Result<Self, DataError>
where
E: Serialize + DomainEvent,
{
let payload = serde_json::to_value(&envelope.payload)
.map_err(|e| DataError::Outbox(format!("failed to serialize event payload: {e}")))?;
Ok(Self {
id: OutboxId::new(),
aggregate_type: envelope.aggregate_type.to_string(),
aggregate_id: envelope.aggregate_id.clone(),
event_type: envelope.event_type.to_string(),
sequence: envelope.sequence,
payload,
occurred_at: envelope.occurred_at,
published: false,
published_at: None,
})
}
}
#[async_trait]
pub trait Outbox: Send + Sync {
async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError>;
async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError>;
async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError>;
}
#[derive(Default)]
pub struct InMemoryOutbox {
entries: Mutex<Vec<OutboxEntry>>,
}
impl InMemoryOutbox {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[async_trait]
impl Outbox for InMemoryOutbox {
async fn enqueue(&self, entries: Vec<OutboxEntry>) -> Result<(), DataError> {
self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner).extend(entries);
Ok(())
}
async fn fetch_unpublished(&self, limit: usize) -> Result<Vec<OutboxEntry>, DataError> {
let guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
let mut unpublished: Vec<OutboxEntry> =
guard.iter().filter(|e| !e.published).cloned().collect();
unpublished.sort_by_key(|e| e.id);
unpublished.truncate(limit);
Ok(unpublished)
}
async fn mark_published(&self, ids: &[OutboxId]) -> Result<(), DataError> {
let now = Timestamp::now();
let mut guard = self.entries.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
for entry in guard.iter_mut() {
if !entry.published && ids.contains(&entry.id) {
entry.published = true;
entry.published_at = Some(now);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::borrow::Cow;
#[derive(Debug, Serialize)]
struct Opened {
owner: String,
}
impl DomainEvent for Opened {
fn event_type(&self) -> &'static str {
"account.opened"
}
}
fn envelope(seq: u64) -> EventEnvelope<Opened> {
EventEnvelope {
event_id: Id::new(),
event_type: Cow::Borrowed("account.opened"),
aggregate_id: "acct-1".to_owned(),
aggregate_type: Cow::Borrowed("account"),
sequence: seq,
occurred_at: Timestamp::from_unix_millis(1_000),
payload: Opened { owner: "alice".to_owned() },
}
}
#[test]
fn from_envelope_carries_metadata_and_serializes_payload() {
let entry = OutboxEntry::from_envelope(&envelope(7)).unwrap();
assert_eq!(entry.aggregate_type, "account");
assert_eq!(entry.aggregate_id, "acct-1");
assert_eq!(entry.event_type, "account.opened");
assert_eq!(entry.sequence, 7);
assert_eq!(entry.payload["owner"], "alice");
assert!(!entry.published);
assert!(entry.published_at.is_none());
}
#[tokio::test]
async fn enqueue_fetch_mark_published_round_trip() {
let outbox = InMemoryOutbox::new();
let e1 = OutboxEntry::from_envelope(&envelope(1)).unwrap();
let e2 = OutboxEntry::from_envelope(&envelope(2)).unwrap();
let (id1, id2) = (e1.id, e2.id);
outbox.enqueue(vec![e1, e2]).await.unwrap();
assert_eq!(outbox.len(), 2);
let unpublished = outbox.fetch_unpublished(10).await.unwrap();
assert_eq!(unpublished.len(), 2);
outbox.mark_published(&[id1]).await.unwrap();
let remaining = outbox.fetch_unpublished(10).await.unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].id, id2);
outbox.mark_published(&[id2]).await.unwrap();
assert!(outbox.fetch_unpublished(10).await.unwrap().is_empty());
}
#[tokio::test]
async fn fetch_unpublished_honors_limit() {
let outbox = InMemoryOutbox::new();
let entries = (1..=5).map(|s| OutboxEntry::from_envelope(&envelope(s)).unwrap()).collect();
outbox.enqueue(entries).await.unwrap();
assert_eq!(outbox.fetch_unpublished(2).await.unwrap().len(), 2);
assert_eq!(outbox.fetch_unpublished(100).await.unwrap().len(), 5);
}
}