use nodedb_types::json_msgpack::JsonValue;
use serde::{Deserialize, Serialize};
use sonic_rs;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcEvent {
pub sequence: u64,
pub partition: u32,
pub collection: String,
pub op: String,
pub row_id: String,
pub event_time: u64,
pub lsn: u64,
pub tenant_id: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub new_value: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub old_value: Option<serde_json::Value>,
#[serde(default)]
pub schema_version: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub field_diffs: Option<Vec<super::super::field_diff::FieldDiff>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub system_time_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub valid_time_ms: Option<i64>,
}
impl CdcEvent {
pub fn to_json_bytes(&self) -> Vec<u8> {
sonic_rs::to_vec(self).unwrap_or_default()
}
pub fn to_msgpack_bytes(&self) -> Vec<u8> {
zerompk::to_msgpack_vec(self).unwrap_or_default()
}
}
impl zerompk::ToMessagePack for CdcEvent {
fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
let field_count = 9
+ usize::from(self.new_value.is_some())
+ usize::from(self.old_value.is_some())
+ usize::from(self.field_diffs.is_some())
+ usize::from(self.system_time_ms.is_some())
+ usize::from(self.valid_time_ms.is_some());
writer.write_map_len(field_count)?;
writer.write_string("sequence")?;
writer.write_u64(self.sequence)?;
writer.write_string("partition")?;
writer.write_u32(self.partition)?;
writer.write_string("collection")?;
writer.write_string(&self.collection)?;
writer.write_string("op")?;
writer.write_string(&self.op)?;
writer.write_string("row_id")?;
writer.write_string(&self.row_id)?;
writer.write_string("event_time")?;
writer.write_u64(self.event_time)?;
writer.write_string("lsn")?;
writer.write_u64(self.lsn)?;
writer.write_string("tenant_id")?;
writer.write_u64(self.tenant_id)?;
writer.write_string("schema_version")?;
writer.write_u64(self.schema_version)?;
if let Some(ref v) = self.new_value {
writer.write_string("new_value")?;
JsonValue(v.clone()).write(writer)?;
}
if let Some(ref v) = self.old_value {
writer.write_string("old_value")?;
JsonValue(v.clone()).write(writer)?;
}
if let Some(ref diffs) = self.field_diffs {
writer.write_string("field_diffs")?;
diffs.write(writer)?;
}
if let Some(ts) = self.system_time_ms {
writer.write_string("system_time_ms")?;
writer.write_i64(ts)?;
}
if let Some(ts) = self.valid_time_ms {
writer.write_string("valid_time_ms")?;
writer.write_i64(ts)?;
}
Ok(())
}
}
impl<'a> zerompk::FromMessagePack<'a> for CdcEvent {
fn read<R: zerompk::Read<'a>>(reader: &mut R) -> zerompk::Result<Self> {
let len = reader.read_map_len()?;
let mut sequence: u64 = 0;
let mut partition: u32 = 0;
let mut collection = String::new();
let mut op = String::new();
let mut row_id = String::new();
let mut event_time: u64 = 0;
let mut lsn: u64 = 0;
let mut tenant_id: u64 = 0;
let mut schema_version: u64 = 0;
let mut new_value: Option<serde_json::Value> = None;
let mut old_value: Option<serde_json::Value> = None;
let mut field_diffs: Option<Vec<super::super::field_diff::FieldDiff>> = None;
let mut system_time_ms: Option<i64> = None;
let mut valid_time_ms: Option<i64> = None;
for _ in 0..len {
let key = reader.read_string()?.into_owned();
match key.as_str() {
"sequence" => sequence = reader.read_u64()?,
"partition" => partition = reader.read_u32()?,
"collection" => collection = reader.read_string()?.into_owned(),
"op" => op = reader.read_string()?.into_owned(),
"row_id" => row_id = reader.read_string()?.into_owned(),
"event_time" => event_time = reader.read_u64()?,
"lsn" => lsn = reader.read_u64()?,
"tenant_id" => tenant_id = reader.read_u64()?,
"schema_version" => schema_version = reader.read_u64()?,
"new_value" => new_value = Some(JsonValue::read(reader)?.0),
"old_value" => old_value = Some(JsonValue::read(reader)?.0),
"field_diffs" => {
field_diffs = Some(Vec::<super::super::field_diff::FieldDiff>::read(reader)?);
}
"system_time_ms" => system_time_ms = Some(reader.read_i64()?),
"valid_time_ms" => valid_time_ms = Some(reader.read_i64()?),
_ => {
JsonValue::read(reader)?;
}
}
}
Ok(CdcEvent {
sequence,
partition,
collection,
op,
row_id,
event_time,
lsn,
tenant_id,
schema_version,
new_value,
old_value,
field_diffs,
system_time_ms,
valid_time_ms,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cdc_event_json_roundtrip() {
let event = CdcEvent {
sequence: 1,
partition: 42,
collection: "orders".into(),
op: "INSERT".into(),
row_id: "order-1".into(),
event_time: 1700000000000,
lsn: 100,
tenant_id: 1,
new_value: Some(serde_json::json!({"id": 1, "total": 99.99})),
old_value: None,
schema_version: 0,
field_diffs: None,
system_time_ms: None,
valid_time_ms: None,
};
let bytes = event.to_json_bytes();
let parsed: CdcEvent = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed.collection, "orders");
assert_eq!(parsed.sequence, 1);
assert!(parsed.old_value.is_none());
}
#[test]
fn cdc_event_msgpack_roundtrip() {
let event = CdcEvent {
sequence: 2,
partition: 10,
collection: "users".into(),
op: "UPDATE".into(),
row_id: "user-5".into(),
event_time: 1700000001000,
lsn: 200,
tenant_id: 1,
new_value: Some(serde_json::json!({"name": "Alice"})),
old_value: Some(serde_json::json!({"name": "Bob"})),
schema_version: 0,
field_diffs: None,
system_time_ms: None,
valid_time_ms: None,
};
let bytes = event.to_msgpack_bytes();
let parsed: CdcEvent = zerompk::from_msgpack(&bytes).unwrap();
assert_eq!(parsed.op, "UPDATE");
assert!(parsed.old_value.is_some());
}
#[test]
fn cdc_event_bitemporal_stamps_roundtrip() {
let event = CdcEvent {
sequence: 3,
partition: 0,
collection: "users".into(),
op: "INSERT".into(),
row_id: "u-1".into(),
event_time: 1700000002000,
lsn: 300,
tenant_id: 1,
new_value: Some(serde_json::json!({"name": "Carol"})),
old_value: None,
schema_version: 0,
field_diffs: None,
system_time_ms: Some(1_700_000_000_000),
valid_time_ms: Some(1_500_000_000_000),
};
let json = event.to_json_bytes();
let parsed_json: CdcEvent = serde_json::from_slice(&json).unwrap();
assert_eq!(parsed_json.system_time_ms, Some(1_700_000_000_000));
assert_eq!(parsed_json.valid_time_ms, Some(1_500_000_000_000));
let mp = event.to_msgpack_bytes();
let parsed_mp: CdcEvent = zerompk::from_msgpack(&mp).unwrap();
assert_eq!(parsed_mp.system_time_ms, Some(1_700_000_000_000));
assert_eq!(parsed_mp.valid_time_ms, Some(1_500_000_000_000));
}
}