Skip to main content

scry_protocol/database_event/
reader.rs

1//! Serialization and deserialization for database events using FlexBuffers.
2
3use super::types::*;
4use anyhow::{Context, Result};
5use serde::de::DeserializeOwned;
6
7/// Serialize a batch to FlexBuffers format.
8pub fn serialize_batch(batch: &DatabaseEventBatch) -> Result<Vec<u8>> {
9    let mut serializer = flexbuffers::FlexbufferSerializer::new();
10    serde::Serialize::serialize(batch, &mut serializer)
11        .context("Failed to serialize DatabaseEventBatch")?;
12    Ok(serializer.take_buffer())
13}
14
15/// Serialize a single event to FlexBuffers format.
16pub fn serialize_event(event: &DatabaseEvent) -> Result<Vec<u8>> {
17    let mut serializer = flexbuffers::FlexbufferSerializer::new();
18    serde::Serialize::serialize(event, &mut serializer)
19        .context("Failed to serialize DatabaseEvent")?;
20    Ok(serializer.take_buffer())
21}
22
23/// Deserialize from FlexBuffers format.
24fn deserialize_flexbuffers<T: DeserializeOwned>(data: &[u8]) -> Result<T> {
25    let reader = flexbuffers::Reader::get_root(data)
26        .context("Failed to read FlexBuffer root")?;
27    T::deserialize(reader).context("Failed to deserialize")
28}
29
30/// Deserialize a batch from FlexBuffers format.
31pub fn read_batch(data: &[u8]) -> Result<DatabaseEventBatch> {
32    deserialize_flexbuffers(data)
33}
34
35/// Deserialize a single event from FlexBuffers format.
36pub fn read_event(data: &[u8]) -> Result<DatabaseEvent> {
37    deserialize_flexbuffers(data)
38}
39
40#[cfg(test)]
41mod tests {
42    use super::*;
43    use crate::database_event::builder::DatabaseEventBuilder;
44
45    #[test]
46    fn test_serialize_deserialize_batch() {
47        let event = DatabaseEventBuilder::insert("public", "users")
48            .position(12345)
49            .transaction_id(100)
50            .columns(vec!["id".to_string(), "name".to_string()])
51            .new_row(Row::new(vec![
52                ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 1]),
53                ColumnValue::from_pg_binary(TypeTag::Text, 25, b"Alice".to_vec()),
54            ]))
55            .build();
56
57        let batch = DatabaseEventBatch::with_events(vec![event])
58            .with_source_id("test-source")
59            .with_batch_seq(42);
60
61        // Serialize
62        let bytes = serialize_batch(&batch).expect("serialize failed");
63        assert!(!bytes.is_empty());
64
65        // Deserialize
66        let recovered = read_batch(&bytes).expect("deserialize failed");
67        assert_eq!(recovered.source_id, batch.source_id);
68        assert_eq!(recovered.batch_seq, batch.batch_seq);
69        assert_eq!(recovered.events.len(), 1);
70
71        let event = &recovered.events[0];
72        assert_eq!(event.schema, "public");
73        assert_eq!(event.table, "users");
74        assert_eq!(event.position, 12345);
75    }
76
77    #[test]
78    fn test_serialize_deserialize_event() {
79        let event = DatabaseEventBuilder::update("myschema", "orders")
80            .position(99999)
81            .new_row(Row::new(vec![
82                ColumnValue::from_pg_binary(TypeTag::Int64, 20, vec![0, 0, 0, 0, 0, 0, 0, 42]),
83            ]))
84            .old_row(Row::new(vec![
85                ColumnValue::from_pg_binary(TypeTag::Int64, 20, vec![0, 0, 0, 0, 0, 0, 0, 41]),
86            ]))
87            .build();
88
89        // Serialize
90        let bytes = serialize_event(&event).expect("serialize failed");
91        assert!(!bytes.is_empty());
92
93        // Deserialize
94        let recovered = read_event(&bytes).expect("deserialize failed");
95        assert_eq!(recovered.operation, OperationType::Update);
96        assert_eq!(recovered.schema, "myschema");
97        assert_eq!(recovered.table, "orders");
98        assert!(recovered.new_row.is_some());
99        assert!(recovered.old_row.is_some());
100    }
101
102    #[test]
103    fn test_null_values_roundtrip() {
104        let event = DatabaseEventBuilder::insert("public", "test")
105            .new_row(Row::new(vec![
106                ColumnValue::null(),
107                ColumnValue::from_pg_binary(TypeTag::Text, 25, b"not null".to_vec()),
108                ColumnValue::null(),
109            ]))
110            .build();
111
112        let bytes = serialize_event(&event).expect("serialize failed");
113        let recovered = read_event(&bytes).expect("deserialize failed");
114
115        let row = recovered.new_row.expect("new_row should exist");
116        assert!(row.values[0].is_null());
117        assert!(!row.values[1].is_null());
118        assert!(row.values[2].is_null());
119    }
120
121    #[test]
122    fn test_relation_meta_roundtrip() {
123        let meta = RelationMeta {
124            rel_id: 12345,
125            schema: "public".to_string(),
126            table: "users".to_string(),
127            columns: vec![
128                ColumnMeta {
129                    name: "id".to_string(),
130                    type_oid: 23,
131                    type_modifier: -1,
132                    is_key: true,
133                },
134                ColumnMeta {
135                    name: "name".to_string(),
136                    type_oid: 25,
137                    type_modifier: -1,
138                    is_key: false,
139                },
140            ],
141            replica_identity: ReplicaIdentity::Full,
142        };
143
144        let event = DatabaseEventBuilder::insert("public", "users")
145            .relation_meta(meta.clone())
146            .build();
147
148        let bytes = serialize_event(&event).expect("serialize failed");
149        let recovered = read_event(&bytes).expect("deserialize failed");
150
151        let recovered_meta = recovered.relation_meta.expect("relation_meta should exist");
152        assert_eq!(recovered_meta.rel_id, meta.rel_id);
153        assert_eq!(recovered_meta.schema, meta.schema);
154        assert_eq!(recovered_meta.columns.len(), 2);
155        assert_eq!(recovered_meta.replica_identity, ReplicaIdentity::Full);
156    }
157}