use crate::common::Result;
use rivven_core::Message;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TransactionMetadata {
pub id: String,
pub lsn: String,
pub sequence: u64,
pub total_events: u64,
pub commit_ts: Option<i64>,
pub is_last: bool,
}
impl TransactionMetadata {
pub fn new(id: impl Into<String>, lsn: impl Into<String>, sequence: u64) -> Self {
Self {
id: id.into(),
lsn: lsn.into(),
sequence,
total_events: 0,
commit_ts: None,
is_last: false,
}
}
pub fn with_total(mut self, total: u64) -> Self {
self.total_events = total;
self
}
pub fn with_commit_ts(mut self, ts: i64) -> Self {
self.commit_ts = Some(ts);
self
}
pub fn with_last(mut self) -> Self {
self.is_last = true;
self
}
pub fn is_single_event(&self) -> bool {
self.total_events == 1 || (self.sequence == 0 && self.is_last)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CdcEvent {
pub source_type: String,
pub database: String,
pub schema: String,
pub table: String,
pub op: CdcOp,
pub before: Option<serde_json::Value>,
pub after: Option<serde_json::Value>,
pub timestamp: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transaction: Option<TransactionMetadata>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum CdcOp {
Insert,
Update,
Delete,
Tombstone,
Truncate,
Snapshot,
Schema,
}
impl CdcEvent {
pub fn insert(
source_type: impl Into<String>,
database: impl Into<String>,
schema: impl Into<String>,
table: impl Into<String>,
data: serde_json::Value,
timestamp: i64,
) -> Self {
Self {
source_type: source_type.into(),
database: database.into(),
schema: schema.into(),
table: table.into(),
op: CdcOp::Insert,
before: None,
after: Some(data),
timestamp,
transaction: None,
}
}
pub fn update(
source_type: impl Into<String>,
database: impl Into<String>,
schema: impl Into<String>,
table: impl Into<String>,
before: Option<serde_json::Value>,
after: serde_json::Value,
timestamp: i64,
) -> Self {
Self {
source_type: source_type.into(),
database: database.into(),
schema: schema.into(),
table: table.into(),
op: CdcOp::Update,
before,
after: Some(after),
timestamp,
transaction: None,
}
}
pub fn delete(
source_type: impl Into<String>,
database: impl Into<String>,
schema: impl Into<String>,
table: impl Into<String>,
before: serde_json::Value,
timestamp: i64,
) -> Self {
Self {
source_type: source_type.into(),
database: database.into(),
schema: schema.into(),
table: table.into(),
op: CdcOp::Delete,
before: Some(before),
after: None,
timestamp,
transaction: None,
}
}
pub fn tombstone(delete_event: &CdcEvent) -> Self {
Self {
source_type: delete_event.source_type.clone(),
database: delete_event.database.clone(),
schema: delete_event.schema.clone(),
table: delete_event.table.clone(),
op: CdcOp::Tombstone,
before: None,
after: None,
timestamp: delete_event.timestamp,
transaction: delete_event.transaction.clone(),
}
}
pub fn tombstone_with_key(
source_type: impl Into<String>,
database: impl Into<String>,
schema: impl Into<String>,
table: impl Into<String>,
key: serde_json::Value,
timestamp: i64,
) -> Self {
Self {
source_type: source_type.into(),
database: database.into(),
schema: schema.into(),
table: table.into(),
op: CdcOp::Tombstone,
before: Some(key),
after: None,
timestamp,
transaction: None,
}
}
pub fn to_tombstone(&self) -> Option<Self> {
if self.op == CdcOp::Delete {
Some(CdcEvent::tombstone(self))
} else {
None
}
}
pub fn is_tombstone(&self) -> bool {
self.op == CdcOp::Tombstone
}
pub fn with_transaction(mut self, txn: TransactionMetadata) -> Self {
self.transaction = Some(txn);
self
}
pub fn txn_id(&self) -> Option<&str> {
self.transaction.as_ref().map(|t| t.id.as_str())
}
pub fn has_transaction(&self) -> bool {
self.transaction.is_some()
}
pub fn is_txn_end(&self) -> bool {
self.transaction
.as_ref()
.map(|t| t.is_last)
.unwrap_or(false)
}
pub fn to_message(&self) -> Result<Message> {
let json_bytes = serde_json::to_vec(self)?;
let msg = Message::new(bytes::Bytes::from(json_bytes))
.add_header(
"cdc_source".to_string(),
self.source_type.as_bytes().to_vec(),
)
.add_header(
"cdc_database".to_string(),
self.database.as_bytes().to_vec(),
)
.add_header("cdc_schema".to_string(), self.schema.as_bytes().to_vec())
.add_header("cdc_table".to_string(), self.table.as_bytes().to_vec())
.add_header(
"cdc_op".to_string(),
format!("{:?}", self.op).as_bytes().to_vec(),
);
Ok(msg)
}
pub fn topic_name(&self) -> String {
format!("cdc.{}.{}.{}", self.database, self.schema, self.table)
}
pub fn is_dml(&self) -> bool {
matches!(
self.op,
CdcOp::Insert | CdcOp::Update | CdcOp::Delete | CdcOp::Tombstone
)
}
}
impl std::fmt::Display for CdcOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CdcOp::Insert => write!(f, "INSERT"),
CdcOp::Update => write!(f, "UPDATE"),
CdcOp::Delete => write!(f, "DELETE"),
CdcOp::Tombstone => write!(f, "TOMBSTONE"),
CdcOp::Truncate => write!(f, "TRUNCATE"),
CdcOp::Snapshot => write!(f, "SNAPSHOT"),
CdcOp::Schema => write!(f, "SCHEMA"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_insert_event() {
let event = CdcEvent::insert(
"postgres",
"mydb",
"public",
"users",
json!({"id": 1, "name": "Alice"}),
1705000000,
);
assert_eq!(event.op, CdcOp::Insert);
assert!(event.before.is_none());
assert!(event.after.is_some());
assert_eq!(event.topic_name(), "cdc.mydb.public.users");
}
#[test]
fn test_update_event() {
let event = CdcEvent::update(
"mysql",
"mydb",
"mydb",
"users",
Some(json!({"id": 1, "name": "Alice"})),
json!({"id": 1, "name": "Bob"}),
1705000000,
);
assert_eq!(event.op, CdcOp::Update);
assert!(event.before.is_some());
assert!(event.after.is_some());
}
#[test]
fn test_delete_event() {
let event = CdcEvent::delete(
"mariadb",
"mydb",
"mydb",
"users",
json!({"id": 1}),
1705000000,
);
assert_eq!(event.op, CdcOp::Delete);
assert!(event.before.is_some());
assert!(event.after.is_none());
}
#[test]
fn test_to_message() {
let event = CdcEvent::insert(
"postgres",
"mydb",
"public",
"users",
json!({"id": 1}),
1705000000,
);
let msg = event.to_message().unwrap();
assert!(!msg.value.is_empty());
}
#[test]
fn test_is_dml() {
assert!(CdcEvent::insert("pg", "db", "s", "t", json!({}), 0).is_dml());
assert!(CdcEvent::update("pg", "db", "s", "t", None, json!({}), 0).is_dml());
assert!(CdcEvent::delete("pg", "db", "s", "t", json!({}), 0).is_dml());
let delete = CdcEvent::delete("pg", "db", "s", "t", json!({}), 0);
let tombstone = CdcEvent::tombstone(&delete);
assert!(tombstone.is_dml());
}
#[test]
fn test_tombstone_from_delete() {
let delete = CdcEvent::delete(
"postgres",
"mydb",
"public",
"users",
json!({"id": 42, "name": "Alice"}),
1705000000,
);
let tombstone = CdcEvent::tombstone(&delete);
assert_eq!(tombstone.op, CdcOp::Tombstone);
assert_eq!(tombstone.source_type, "postgres");
assert_eq!(tombstone.database, "mydb");
assert_eq!(tombstone.schema, "public");
assert_eq!(tombstone.table, "users");
assert!(tombstone.before.is_none());
assert!(tombstone.after.is_none());
assert_eq!(tombstone.timestamp, delete.timestamp);
assert!(tombstone.is_tombstone());
}
#[test]
fn test_tombstone_with_key() {
let tombstone = CdcEvent::tombstone_with_key(
"mysql",
"inventory",
"inventory",
"products",
json!({"product_id": 101}),
1705000000,
);
assert_eq!(tombstone.op, CdcOp::Tombstone);
assert!(tombstone.is_tombstone());
assert_eq!(tombstone.before, Some(json!({"product_id": 101})));
assert!(tombstone.after.is_none());
}
#[test]
fn test_to_tombstone_from_delete() {
let delete = CdcEvent::delete("pg", "db", "s", "t", json!({"id": 1}), 0);
let tombstone = delete.to_tombstone();
assert!(tombstone.is_some());
let t = tombstone.unwrap();
assert!(t.is_tombstone());
}
#[test]
fn test_to_tombstone_from_insert_returns_none() {
let insert = CdcEvent::insert("pg", "db", "s", "t", json!({"id": 1}), 0);
assert!(insert.to_tombstone().is_none());
}
#[test]
fn test_tombstone_preserves_transaction() {
let txn = TransactionMetadata::new("txn-del", "0/DEAD", 5)
.with_total(10)
.with_last();
let delete =
CdcEvent::delete("pg", "db", "s", "t", json!({"id": 1}), 1000).with_transaction(txn);
let tombstone = CdcEvent::tombstone(&delete);
assert!(tombstone.has_transaction());
assert_eq!(tombstone.txn_id(), Some("txn-del"));
assert!(tombstone.is_txn_end());
}
#[test]
fn test_tombstone_serialization() {
let delete = CdcEvent::delete("pg", "db", "s", "t", json!({"id": 99}), 0);
let tombstone = CdcEvent::tombstone(&delete);
let json = serde_json::to_string(&tombstone).unwrap();
assert!(json.contains("\"op\":\"Tombstone\""));
assert!(json.contains("\"before\":null"));
assert!(json.contains("\"after\":null"));
let parsed: CdcEvent = serde_json::from_str(&json).unwrap();
assert!(parsed.is_tombstone());
assert!(parsed.before.is_none());
assert!(parsed.after.is_none());
}
#[test]
fn test_cdcop_display_tombstone() {
assert_eq!(CdcOp::Tombstone.to_string(), "TOMBSTONE");
}
#[test]
fn test_transaction_metadata() {
let txn = TransactionMetadata::new("txn-123", "0/1234", 0)
.with_total(3)
.with_commit_ts(1705000000);
assert_eq!(txn.id, "txn-123");
assert_eq!(txn.lsn, "0/1234");
assert_eq!(txn.sequence, 0);
assert_eq!(txn.total_events, 3);
assert_eq!(txn.commit_ts, Some(1705000000));
assert!(!txn.is_last);
assert!(!txn.is_single_event());
}
#[test]
fn test_transaction_metadata_single_event() {
let txn = TransactionMetadata::new("txn-456", "0/5678", 0)
.with_total(1)
.with_last();
assert!(txn.is_single_event());
assert!(txn.is_last);
}
#[test]
fn test_event_with_transaction() {
let txn = TransactionMetadata::new("txn-789", "0/ABCD", 1)
.with_total(5)
.with_last();
let event = CdcEvent::insert(
"postgres",
"mydb",
"public",
"users",
json!({"id": 1}),
1705000000,
)
.with_transaction(txn);
assert!(event.has_transaction());
assert_eq!(event.txn_id(), Some("txn-789"));
assert!(event.is_txn_end());
}
#[test]
fn test_event_without_transaction() {
let event = CdcEvent::insert(
"postgres",
"mydb",
"public",
"users",
json!({"id": 1}),
1705000000,
);
assert!(!event.has_transaction());
assert_eq!(event.txn_id(), None);
assert!(!event.is_txn_end());
}
#[test]
fn test_transaction_serialization() {
let txn = TransactionMetadata::new("txn-ser", "0/FF00", 2)
.with_total(10)
.with_commit_ts(1705000000);
let event =
CdcEvent::insert("pg", "db", "s", "t", json!({"id": 1}), 0).with_transaction(txn);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("transaction"));
assert!(json.contains("txn-ser"));
assert!(json.contains("0/FF00"));
let parsed: CdcEvent = serde_json::from_str(&json).unwrap();
assert!(parsed.has_transaction());
assert_eq!(parsed.txn_id(), Some("txn-ser"));
}
#[test]
fn test_event_without_transaction_serialization() {
let event = CdcEvent::insert("pg", "db", "s", "t", json!({"id": 1}), 0);
let json = serde_json::to_string(&event).unwrap();
assert!(!json.contains("transaction"));
let parsed: CdcEvent = serde_json::from_str(&json).unwrap();
assert!(!parsed.has_transaction());
}
}