use async_trait::async_trait;
use serde_json::Value;
use crate::core::{Error, Event, Result};
use super::Transform;
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum OutboxResult {
IsOutboxEvent {
aggregate_id: String,
event_type: String,
payload: Value,
},
NotOutboxEvent,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutboxTransform {
pub enabled: bool,
pub outbox_table: String,
}
impl OutboxTransform {
pub fn new(outbox_table: impl Into<String>) -> Self {
Self {
enabled: true,
outbox_table: outbox_table.into(),
}
}
pub fn apply_outbox(&self, event: &mut Event) -> Result<OutboxResult> {
if !self.enabled || event.table != self.outbox_table {
return Ok(OutboxResult::NotOutboxEvent);
}
if !matches!(event.op, crate::core::Operation::Insert) {
return Ok(OutboxResult::NotOutboxEvent);
}
let Some(Value::Object(after)) = event.after.as_ref() else {
return Err(Error::TransformError(
"outbox event requires object payload in after".into(),
));
};
let aggregate_id = after
.get("aggregate_id")
.ok_or_else(|| Error::TransformError("missing aggregate_id in outbox event".into()))?
.as_str()
.ok_or_else(|| Error::TransformError("aggregate_id must be a string".into()))?
.to_string();
let event_type = after
.get("event_type")
.ok_or_else(|| Error::TransformError("missing event_type in outbox event".into()))?
.as_str()
.ok_or_else(|| Error::TransformError("event_type must be a string".into()))?
.to_string();
let payload = after
.get("payload")
.ok_or_else(|| Error::TransformError("missing payload in outbox event".into()))?
.clone();
Ok(OutboxResult::IsOutboxEvent {
aggregate_id,
event_type,
payload,
})
}
}
#[async_trait]
impl Transform for OutboxTransform {
async fn apply(&self, event: &mut Event) -> Result<bool> {
match self.apply_outbox(event)? {
OutboxResult::NotOutboxEvent => Ok(true),
OutboxResult::IsOutboxEvent {
aggregate_id,
event_type,
payload,
} => {
event.table = event_type;
event.after = Some(payload);
event.primary_key = Some(vec![aggregate_id]);
Ok(true)
}
}
}
fn name(&self) -> &str {
"outbox"
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use crate::core::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
use crate::transform::Transform;
use super::{OutboxResult, OutboxTransform};
fn event(table: &str, after: serde_json::Value) -> Event {
Event {
before: None,
after: Some(after),
op: Operation::Insert,
source: SourceMetadata {
source_name: "test".into(),
offset: "1".into(),
timestamp: 1,
},
ts: 1,
schema: Some("public".into()),
table: table.into(),
primary_key: Some(vec!["id".into()]),
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
#[test]
fn outbox_event_is_detected_and_parsed() {
let transform = OutboxTransform::new("outbox");
let mut event = event(
"outbox",
json!({"aggregate_id": "u1", "event_type": "user.created", "payload": {"id": 1}}),
);
let parsed = transform.apply_outbox(&mut event).unwrap();
assert!(matches!(parsed, OutboxResult::IsOutboxEvent { .. }));
}
#[test]
fn regular_event_returns_not_outbox() {
let transform = OutboxTransform::new("outbox");
let mut event = event("users", json!({"id": 1}));
let parsed = transform.apply_outbox(&mut event).unwrap();
assert_eq!(parsed, OutboxResult::NotOutboxEvent);
}
#[test]
fn missing_outbox_fields_error() {
let transform = OutboxTransform::new("outbox");
let mut event = event("outbox", json!({"aggregate_id": "u1"}));
assert!(transform.apply_outbox(&mut event).is_err());
}
#[tokio::test]
async fn apply_rewrites_table_and_payload_for_outbox_events() {
let transform = OutboxTransform::new("outbox");
let mut e = event(
"outbox",
json!({
"aggregate_id": "order-42",
"event_type": "order.placed",
"payload": {"order_id": 42, "total": 99.9}
}),
);
assert!(transform.apply(&mut e).await.unwrap());
assert_eq!(e.table, "order.placed");
assert_eq!(e.after.unwrap(), json!({"order_id": 42, "total": 99.9}));
}
#[tokio::test]
async fn apply_sets_aggregate_id_as_primary_key() {
let transform = OutboxTransform::new("outbox");
let mut e = event(
"outbox",
json!({"aggregate_id": "order-42", "event_type": "order.placed", "payload": {}}),
);
transform.apply(&mut e).await.unwrap();
assert_eq!(
e.primary_key.as_deref(),
Some([String::from("order-42")].as_slice()),
"aggregate_id must become the primary key for partition ordering"
);
}
#[tokio::test]
async fn apply_passes_through_non_outbox_events_unchanged() {
let transform = OutboxTransform::new("outbox");
let payload = json!({"id": 7});
let mut e = event("users", payload.clone());
assert!(transform.apply(&mut e).await.unwrap());
assert_eq!(e.table, "users");
assert_eq!(e.after.unwrap(), payload);
}
#[tokio::test]
async fn apply_errors_on_malformed_outbox_event() {
let transform = OutboxTransform::new("outbox");
let mut e = event("outbox", json!({"aggregate_id": "x"}));
assert!(
transform.apply(&mut e).await.is_err(),
"missing event_type must yield an error"
);
}
#[tokio::test]
async fn delete_on_outbox_table_passes_through_without_error() {
let transform = OutboxTransform::new("outbox");
let mut e = Event {
before: Some(json!({"aggregate_id": "u1", "event_type": "x", "payload": {}})),
after: None,
op: Operation::Delete,
source: SourceMetadata {
source_name: "test".into(),
offset: "5".into(),
timestamp: 5,
},
ts: 5,
schema: Some("public".into()),
table: "outbox".into(),
primary_key: Some(vec!["id".into()]),
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
};
assert!(
transform.apply(&mut e).await.unwrap(),
"Delete on outbox table must pass through"
);
assert_eq!(e.table, "outbox", "table must remain unchanged");
}
#[tokio::test]
async fn truncate_on_outbox_table_passes_through_without_error() {
let transform = OutboxTransform::new("outbox");
let mut e = Event {
before: None,
after: None,
op: Operation::Truncate,
source: SourceMetadata {
source_name: "test".into(),
offset: "6".into(),
timestamp: 6,
},
ts: 6,
schema: Some("public".into()),
table: "outbox".into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
};
assert!(
transform.apply(&mut e).await.unwrap(),
"Truncate on outbox table must not error"
);
assert_eq!(
e.table, "outbox",
"table must remain unchanged for Truncate"
);
}
#[tokio::test]
async fn update_on_outbox_table_passes_through_without_processing() {
let transform = OutboxTransform::new("outbox");
let mut e = Event {
before: Some(
json!({"aggregate_id": "u1", "event_type": "user.created", "payload": {}}),
),
after: Some(
json!({"aggregate_id": "u1", "event_type": "user.created", "payload": {}, "processed": true}),
),
op: Operation::Update,
source: SourceMetadata {
source_name: "test".into(),
offset: "7".into(),
timestamp: 7,
},
ts: 7,
schema: Some("public".into()),
table: "outbox".into(),
primary_key: Some(vec!["id".into()]),
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
};
assert!(transform.apply(&mut e).await.unwrap());
assert_eq!(e.table, "outbox");
}
}