rustcdc 0.1.5

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
//! Protocol Buffer encoding for CDC events.
//!
//! Uses [prost](https://crates.io/crates/prost) for efficient protobuf
//! serialization.  No `protoc` installation or `build.rs` code generation is
//! required — the Rust structs are defined directly with `prost` derive macros.
//!
//! The canonical `.proto` schema is documented in `proto/event.proto` in this
//! repository and can be used to generate bindings for any protobuf-compatible
//! language (Go, Python, Java, TypeScript, …).
//!
//! # Wire format
//!
//! The `before` and `after` row-image fields carry **UTF-8 JSON** encoded as
//! protobuf `bytes`.  This preserves the schemaless nature of the row payload
//! while remaining self-describing.  Consumers decode the bytes as a JSON
//! object and can optionally re-validate against a schema registry.
//!
//! # Schema version
//!
//! The generated bytes match schema version `1` (the current
//! `EVENT_ENVELOPE_VERSION`).  Field numbers are stable; new optional fields
//! will be added with new tag numbers in future minor versions.

use prost::Message;

use crate::codec::{EncodedOutput, EventEncoder};
use crate::core::{Error, Event, Operation, Result};

const CONTENT_TYPE: &str = "application/x-protobuf";

// ─── ProtobufEncoder ──────────────────────────────────────────────────────────

/// Encodes CDC events as Protocol Buffer binary bytes.
///
/// The encoding corresponds to the schema in `proto/event.proto`.  Use that
/// file with `protoc` to generate deserialization bindings in any
/// protobuf-compatible language.
///
/// # Example
///
/// ```rust
/// # use rustcdc::codec::{EventEncoder, ProtobufEncoder};
/// # use rustcdc::{Event, Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
/// let encoder = ProtobufEncoder;
/// let event = Event {
///     before: None,
///     after: Some(serde_json::json!({"id": 1})),
///     op: Operation::Insert,
///     source: SourceMetadata {
///         source_name: "postgres".into(),
///         offset: "0/16B6A70".into(),
///         timestamp: 1,
///     },
///     ts: 1,
///     schema: None,
///     table: "users".into(),
///     primary_key: None,
///     snapshot: None,
///     transaction: None,
///     envelope_version: EVENT_ENVELOPE_VERSION,
/// };
/// let out = encoder.encode(&event).unwrap();
/// assert_eq!(out.content_type, "application/x-protobuf");
/// ```
#[derive(Debug, Clone, Default)]
pub struct ProtobufEncoder;

impl EventEncoder for ProtobufEncoder {
    fn encode(&self, event: &Event) -> Result<EncodedOutput> {
        let proto = ProtoEvent::from_event(event)?;
        Ok(EncodedOutput::new(proto.encode_to_vec(), CONTENT_TYPE))
    }

    fn content_type(&self) -> &'static str {
        CONTENT_TYPE
    }
}

// ─── Proto message types ──────────────────────────────────────────────────────
//
// These structs mirror `proto/event.proto` exactly (same field names, numbers,
// and types).  If you update these structs you MUST update the proto file.

/// Protobuf representation of [`crate::core::Operation`].
///
/// Mirrors `enum Operation` in `proto/event.proto`.
#[derive(Clone, Copy, Debug, PartialEq, Eq, prost::Enumeration)]
#[repr(i32)]
pub enum ProtoOperation {
    /// Default/unspecified — never emitted by a well-formed encoder.
    Unspecified = 0,
    Insert = 1,
    Update = 2,
    Delete = 3,
    Read = 4,
    SchemaChange = 5,
    Truncate = 6,
}

impl ProtoOperation {
    fn from_op(op: Operation) -> Self {
        match op {
            Operation::Insert => Self::Insert,
            Operation::Update => Self::Update,
            Operation::Delete => Self::Delete,
            Operation::Read => Self::Read,
            Operation::SchemaChange => Self::SchemaChange,
            Operation::Truncate => Self::Truncate,
        }
    }
}

/// Protobuf representation of [`crate::core::SourceMetadata`].
///
/// Mirrors `message SourceMetadata` in `proto/event.proto`.
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoSourceMetadata {
    /// Logical connector name (e.g. `"postgres"`, `"mysql"`).
    #[prost(string, tag = "1")]
    pub source_name: String,
    /// Source-specific durable position (LSN, GTID, …).
    #[prost(string, tag = "2")]
    pub offset: String,
    /// Source timestamp in milliseconds since Unix epoch.
    #[prost(uint64, tag = "3")]
    pub timestamp: u64,
}

/// Protobuf representation of [`crate::core::SnapshotMetadata`].
///
/// Mirrors `message SnapshotMetadata` in `proto/event.proto`.
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoSnapshotMetadata {
    /// Snapshot session identifier.
    #[prost(string, tag = "1")]
    pub snapshot_id: String,
    /// Zero-based chunk index.
    #[prost(uint32, tag = "2")]
    pub chunk_index: u32,
    /// Whether this is the final chunk.
    #[prost(bool, tag = "3")]
    pub is_last_chunk: bool,
}

/// Protobuf representation of [`crate::core::TransactionMetadata`].
///
/// Mirrors `message TransactionMetadata` in `proto/event.proto`.
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoTransactionMetadata {
    /// Source transaction identifier.
    #[prost(uint64, tag = "1")]
    pub tx_id: u64,
    /// Total events in this transaction.
    #[prost(uint32, tag = "2")]
    pub total_events: u32,
    /// Zero-based position of this event within the transaction.
    #[prost(uint32, tag = "3")]
    pub event_index: u32,
}

/// Protobuf representation of [`crate::core::Event`].
///
/// Mirrors `message Event` in `proto/event.proto`.
///
/// `before` and `after` are UTF-8 JSON bytes (`None` → absent field).
#[derive(Clone, PartialEq, prost::Message)]
pub struct ProtoEvent {
    /// JSON-encoded before-image (absent when `None`).
    #[prost(bytes = "vec", optional, tag = "1")]
    pub before: Option<Vec<u8>>,
    /// JSON-encoded after-image (absent when `None`).
    #[prost(bytes = "vec", optional, tag = "2")]
    pub after: Option<Vec<u8>>,
    /// CRUD operation.
    #[prost(enumeration = "ProtoOperation", tag = "3")]
    pub op: i32,
    /// Source identity and durable offset.
    #[prost(message, optional, tag = "4")]
    pub source: Option<ProtoSourceMetadata>,
    /// Event timestamp in milliseconds since Unix epoch.
    #[prost(uint64, tag = "5")]
    pub ts: u64,
    /// Schema name (absent when unknown).
    #[prost(string, optional, tag = "6")]
    pub schema: Option<String>,
    /// Table name.
    #[prost(string, tag = "7")]
    pub table: String,
    /// Primary key column names (empty when unknown).
    #[prost(string, repeated, tag = "8")]
    pub primary_key: Vec<String>,
    /// Snapshot metadata (absent outside snapshot phase).
    #[prost(message, optional, tag = "9")]
    pub snapshot: Option<ProtoSnapshotMetadata>,
    /// Transaction metadata (absent for single-event transactions).
    #[prost(message, optional, tag = "10")]
    pub transaction: Option<ProtoTransactionMetadata>,
    /// Canonical envelope schema version.
    #[prost(uint32, tag = "11")]
    pub envelope_version: u32,
}

impl ProtoEvent {
    /// Convert a [`crate::core::Event`] into its protobuf representation.
    pub fn from_event(event: &Event) -> Result<Self> {
        let before = event
            .before
            .as_ref()
            .map(serde_json::to_vec)
            .transpose()
            .map_err(|e| Error::SerializationError(format!("protobuf before encode: {e}")))?;

        let after = event
            .after
            .as_ref()
            .map(serde_json::to_vec)
            .transpose()
            .map_err(|e| Error::SerializationError(format!("protobuf after encode: {e}")))?;

        Ok(Self {
            before,
            after,
            op: ProtoOperation::from_op(event.op) as i32,
            source: Some(ProtoSourceMetadata {
                source_name: event.source.source_name.clone(),
                offset: event.source.offset.clone(),
                timestamp: event.source.timestamp,
            }),
            ts: event.ts,
            schema: event.schema.clone(),
            table: event.table.clone(),
            primary_key: event.primary_key.clone().unwrap_or_default(),
            snapshot: event.snapshot.as_ref().map(|s| ProtoSnapshotMetadata {
                snapshot_id: s.snapshot_id.clone(),
                chunk_index: s.chunk_index,
                is_last_chunk: s.is_last_chunk,
            }),
            transaction: event
                .transaction
                .as_ref()
                .map(|t| ProtoTransactionMetadata {
                    tx_id: t.tx_id,
                    total_events: t.total_events,
                    event_index: t.event_index,
                }),
            envelope_version: event.envelope_version as u32,
        })
    }

    /// Decode a `ProtoEvent` from raw protobuf bytes.
    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
        Self::decode(bytes).map_err(|e| Error::SerializationError(format!("protobuf decode: {e}")))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::{
        Event, Operation, SnapshotMetadata, SourceMetadata, TransactionMetadata,
        EVENT_ENVELOPE_VERSION,
    };

    fn full_event() -> Event {
        Event {
            before: Some(serde_json::json!({"id": 1, "name": "alice"})),
            after: Some(serde_json::json!({"id": 1, "name": "alice-v2"})),
            op: Operation::Update,
            source: SourceMetadata {
                source_name: "postgres".into(),
                offset: "0/16B6A70".into(),
                timestamp: 1716595200000,
            },
            ts: 1716595200000,
            schema: Some("public".into()),
            table: "users".into(),
            primary_key: Some(vec!["id".into()]),
            snapshot: Some(SnapshotMetadata {
                snapshot_id: "snap-1".into(),
                chunk_index: 0,
                is_last_chunk: false,
            }),
            transaction: Some(TransactionMetadata {
                tx_id: 42,
                total_events: 3,
                event_index: 1,
            }),
            envelope_version: EVENT_ENVELOPE_VERSION,
        }
    }

    fn insert_event() -> Event {
        Event {
            before: None,
            after: Some(serde_json::json!({"id": 1})),
            op: Operation::Insert,
            source: SourceMetadata {
                source_name: "mysql".into(),
                offset: "gtid:abc".into(),
                timestamp: 1,
            },
            ts: 1,
            schema: None,
            table: "orders".into(),
            primary_key: None,
            snapshot: None,
            transaction: None,
            envelope_version: EVENT_ENVELOPE_VERSION,
        }
    }

    #[test]
    fn encode_produces_non_empty_bytes() {
        let enc = ProtobufEncoder;
        let out = enc.encode(&insert_event()).unwrap();
        assert!(!out.bytes.is_empty());
        assert_eq!(out.content_type, "application/x-protobuf");
    }

    #[test]
    fn proto_roundtrip_preserves_all_fields() {
        let event = full_event();
        let proto = ProtoEvent::from_event(&event).unwrap();
        let bytes = proto.encode_to_vec();
        let decoded = ProtoEvent::from_bytes(&bytes).unwrap();

        assert_eq!(decoded.table, "users");
        assert_eq!(decoded.op, ProtoOperation::Update as i32);
        assert_eq!(decoded.schema, Some("public".into()));
        assert_eq!(decoded.primary_key, vec!["id"]);
        assert_eq!(decoded.ts, 1716595200000);
        assert_eq!(decoded.envelope_version, EVENT_ENVELOPE_VERSION as u32);

        // before/after are JSON bytes
        let before: serde_json::Value =
            serde_json::from_slice(decoded.before.as_ref().unwrap()).unwrap();
        assert_eq!(before["name"], "alice");
        let after: serde_json::Value =
            serde_json::from_slice(decoded.after.as_ref().unwrap()).unwrap();
        assert_eq!(after["name"], "alice-v2");

        // source
        let src = decoded.source.unwrap();
        assert_eq!(src.source_name, "postgres");
        assert_eq!(src.offset, "0/16B6A70");

        // snapshot
        let snap = decoded.snapshot.unwrap();
        assert_eq!(snap.snapshot_id, "snap-1");
        assert!(!snap.is_last_chunk);

        // transaction
        let tx = decoded.transaction.unwrap();
        assert_eq!(tx.tx_id, 42);
        assert_eq!(tx.total_events, 3);
        assert_eq!(tx.event_index, 1);
    }

    #[test]
    fn insert_event_has_no_before_field() {
        let event = insert_event();
        let proto = ProtoEvent::from_event(&event).unwrap();
        assert!(proto.before.is_none());
        assert!(proto.after.is_some());
        assert_eq!(proto.op, ProtoOperation::Insert as i32);
    }

    #[test]
    fn all_operations_encode_correctly() {
        let ops = [
            (Operation::Insert, ProtoOperation::Insert),
            (Operation::Update, ProtoOperation::Update),
            (Operation::Delete, ProtoOperation::Delete),
            (Operation::Read, ProtoOperation::Read),
            (Operation::SchemaChange, ProtoOperation::SchemaChange),
            (Operation::Truncate, ProtoOperation::Truncate),
        ];
        for (op, expected) in ops {
            let mut ev = insert_event();
            ev.op = op;
            let proto = ProtoEvent::from_event(&ev).unwrap();
            assert_eq!(proto.op, expected as i32, "op mismatch for {op:?}");
        }
    }

    #[test]
    fn content_type_is_x_protobuf() {
        assert_eq!(ProtobufEncoder.content_type(), "application/x-protobuf");
    }
}