use prost::Message;
use crate::codec::{EncodedOutput, EventEncoder};
use crate::core::{Error, Event, Operation, Result};
const CONTENT_TYPE: &str = "application/x-protobuf";
#[derive(Debug, Clone, Default)]
pub struct ProtobufEncoder;
impl EventEncoder for ProtobufEncoder {
fn encode(&self, event: &Event) -> Result<EncodedOutput> {
let proto = ProtoEvent::from_event(event)?;
Ok(EncodedOutput::new(proto.encode_to_vec(), CONTENT_TYPE))
}
fn content_type(&self) -> &'static str {
CONTENT_TYPE
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, prost::Enumeration)]
#[repr(i32)]
pub enum ProtoOperation {
Unspecified = 0,
Insert = 1,
Update = 2,
Delete = 3,
Read = 4,
SchemaChange = 5,
Truncate = 6,
}
impl ProtoOperation {
fn from_op(op: Operation) -> Self {
match op {
Operation::Insert => Self::Insert,
Operation::Update => Self::Update,
Operation::Delete => Self::Delete,
Operation::Read => Self::Read,
Operation::SchemaChange => Self::SchemaChange,
Operation::Truncate => Self::Truncate,
}
}
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoSourceMetadata {
#[prost(string, tag = "1")]
pub source_name: String,
#[prost(string, tag = "2")]
pub offset: String,
#[prost(uint64, tag = "3")]
pub timestamp: u64,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoSnapshotMetadata {
#[prost(string, tag = "1")]
pub snapshot_id: String,
#[prost(uint32, tag = "2")]
pub chunk_index: u32,
#[prost(bool, tag = "3")]
pub is_last_chunk: bool,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoTransactionMetadata {
#[prost(uint64, tag = "1")]
pub tx_id: u64,
#[prost(uint32, tag = "2")]
pub total_events: u32,
#[prost(uint32, tag = "3")]
pub event_index: u32,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoEvent {
#[prost(bytes = "vec", optional, tag = "1")]
pub before: Option<Vec<u8>>,
#[prost(bytes = "vec", optional, tag = "2")]
pub after: Option<Vec<u8>>,
#[prost(enumeration = "ProtoOperation", tag = "3")]
pub op: i32,
#[prost(message, optional, tag = "4")]
pub source: Option<ProtoSourceMetadata>,
#[prost(uint64, tag = "5")]
pub ts: u64,
#[prost(string, optional, tag = "6")]
pub schema: Option<String>,
#[prost(string, tag = "7")]
pub table: String,
#[prost(string, repeated, tag = "8")]
pub primary_key: Vec<String>,
#[prost(message, optional, tag = "9")]
pub snapshot: Option<ProtoSnapshotMetadata>,
#[prost(message, optional, tag = "10")]
pub transaction: Option<ProtoTransactionMetadata>,
#[prost(uint32, tag = "11")]
pub envelope_version: u32,
#[prost(bool, tag = "12")]
pub before_is_key_only: bool,
}
impl ProtoEvent {
pub fn from_event(event: &Event) -> Result<Self> {
let before = event
.before
.as_ref()
.map(serde_json::to_vec)
.transpose()
.map_err(|e| Error::SerializationError(format!("protobuf before encode: {e}")))?;
let after = event
.after
.as_ref()
.map(serde_json::to_vec)
.transpose()
.map_err(|e| Error::SerializationError(format!("protobuf after encode: {e}")))?;
Ok(Self {
before,
after,
op: ProtoOperation::from_op(event.op) as i32,
source: Some(ProtoSourceMetadata {
source_name: event.source.source_name.clone(),
offset: event.source.offset.clone(),
timestamp: event.source.timestamp,
}),
ts: event.ts,
schema: event.schema.clone(),
table: event.table.clone(),
primary_key: event.primary_key.clone().unwrap_or_default(),
snapshot: event.snapshot.as_ref().map(|s| ProtoSnapshotMetadata {
snapshot_id: s.snapshot_id.clone(),
chunk_index: s.chunk_index,
is_last_chunk: s.is_last_chunk,
}),
transaction: event
.transaction
.as_ref()
.map(|t| ProtoTransactionMetadata {
tx_id: t.tx_id,
total_events: t.total_events.unwrap_or(0),
event_index: t.event_index,
}),
envelope_version: event.envelope_version as u32,
before_is_key_only: event.before_is_key_only,
})
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
Self::decode(bytes).map_err(|e| Error::SerializationError(format!("protobuf decode: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{
Event, Operation, SnapshotMetadata, SourceMetadata, TransactionMetadata,
EVENT_ENVELOPE_VERSION,
};
fn full_event() -> Event {
Event {
before: Some(serde_json::json!({"id": 1, "name": "alice"})),
after: Some(serde_json::json!({"id": 1, "name": "alice-v2"})),
op: Operation::Update,
source: SourceMetadata {
source_name: "postgres".into(),
offset: "0/16B6A70".into(),
timestamp: 1716595200000,
},
ts: 1716595200000,
schema: Some("public".into()),
table: "users".into(),
primary_key: Some(vec!["id".into()]),
snapshot: Some(SnapshotMetadata {
snapshot_id: "snap-1".into(),
chunk_index: 0,
is_last_chunk: false,
}),
transaction: Some(TransactionMetadata {
tx_id: 42,
total_events: Some(3),
event_index: 1,
}),
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
fn insert_event() -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": 1})),
op: Operation::Insert,
source: SourceMetadata {
source_name: "mysql".into(),
offset: "gtid:abc".into(),
timestamp: 1,
},
ts: 1,
schema: None,
table: "orders".into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
#[test]
fn encode_produces_non_empty_bytes() {
let enc = ProtobufEncoder;
let out = enc.encode(&insert_event()).unwrap();
assert!(!out.bytes.is_empty());
assert_eq!(out.content_type, "application/x-protobuf");
}
#[test]
fn proto_roundtrip_preserves_all_fields() {
let event = full_event();
let proto = ProtoEvent::from_event(&event).unwrap();
let bytes = proto.encode_to_vec();
let decoded = ProtoEvent::from_bytes(&bytes).unwrap();
assert_eq!(decoded.table, "users");
assert_eq!(decoded.op, ProtoOperation::Update as i32);
assert_eq!(decoded.schema, Some("public".into()));
assert_eq!(decoded.primary_key, vec!["id"]);
assert_eq!(decoded.ts, 1716595200000);
assert_eq!(decoded.envelope_version, EVENT_ENVELOPE_VERSION as u32);
let before: serde_json::Value =
serde_json::from_slice(decoded.before.as_ref().unwrap()).unwrap();
assert_eq!(before["name"], "alice");
let after: serde_json::Value =
serde_json::from_slice(decoded.after.as_ref().unwrap()).unwrap();
assert_eq!(after["name"], "alice-v2");
let src = decoded.source.unwrap();
assert_eq!(src.source_name, "postgres");
assert_eq!(src.offset, "0/16B6A70");
let snap = decoded.snapshot.unwrap();
assert_eq!(snap.snapshot_id, "snap-1");
assert!(!snap.is_last_chunk);
let tx = decoded.transaction.unwrap();
assert_eq!(tx.tx_id, 42);
assert_eq!(tx.total_events, 3);
assert_eq!(tx.event_index, 1);
assert!(!decoded.before_is_key_only);
}
#[test]
fn insert_event_has_no_before_field() {
let event = insert_event();
let proto = ProtoEvent::from_event(&event).unwrap();
assert!(proto.before.is_none());
assert!(proto.after.is_some());
assert_eq!(proto.op, ProtoOperation::Insert as i32);
}
#[test]
fn all_operations_encode_correctly() {
let ops = [
(Operation::Insert, ProtoOperation::Insert),
(Operation::Update, ProtoOperation::Update),
(Operation::Delete, ProtoOperation::Delete),
(Operation::Read, ProtoOperation::Read),
(Operation::SchemaChange, ProtoOperation::SchemaChange),
(Operation::Truncate, ProtoOperation::Truncate),
];
for (op, expected) in ops {
let mut ev = insert_event();
ev.op = op;
let proto = ProtoEvent::from_event(&ev).unwrap();
assert_eq!(proto.op, expected as i32, "op mismatch for {op:?}");
}
}
#[test]
fn before_is_key_only_round_trips_through_proto() {
let mut event = full_event();
event.before_is_key_only = true;
let proto = ProtoEvent::from_event(&event).unwrap();
let bytes = proto.encode_to_vec();
let decoded = ProtoEvent::from_bytes(&bytes).unwrap();
assert!(decoded.before_is_key_only);
}
#[test]
fn content_type_is_x_protobuf() {
assert_eq!(ProtobufEncoder.content_type(), "application/x-protobuf");
}
}