scry_protocol/database_event/
reader.rs1use super::types::*;
4use anyhow::{Context, Result};
5use serde::de::DeserializeOwned;
6
7pub fn serialize_batch(batch: &DatabaseEventBatch) -> Result<Vec<u8>> {
9 let mut serializer = flexbuffers::FlexbufferSerializer::new();
10 serde::Serialize::serialize(batch, &mut serializer)
11 .context("Failed to serialize DatabaseEventBatch")?;
12 Ok(serializer.take_buffer())
13}
14
15pub fn serialize_event(event: &DatabaseEvent) -> Result<Vec<u8>> {
17 let mut serializer = flexbuffers::FlexbufferSerializer::new();
18 serde::Serialize::serialize(event, &mut serializer)
19 .context("Failed to serialize DatabaseEvent")?;
20 Ok(serializer.take_buffer())
21}
22
23fn deserialize_flexbuffers<T: DeserializeOwned>(data: &[u8]) -> Result<T> {
25 let reader = flexbuffers::Reader::get_root(data)
26 .context("Failed to read FlexBuffer root")?;
27 T::deserialize(reader).context("Failed to deserialize")
28}
29
30pub fn read_batch(data: &[u8]) -> Result<DatabaseEventBatch> {
32 deserialize_flexbuffers(data)
33}
34
35pub fn read_event(data: &[u8]) -> Result<DatabaseEvent> {
37 deserialize_flexbuffers(data)
38}
39
40#[cfg(test)]
41mod tests {
42 use super::*;
43 use crate::database_event::builder::DatabaseEventBuilder;
44
45 #[test]
46 fn test_serialize_deserialize_batch() {
47 let event = DatabaseEventBuilder::insert("public", "users")
48 .position(12345)
49 .transaction_id(100)
50 .columns(vec!["id".to_string(), "name".to_string()])
51 .new_row(Row::new(vec![
52 ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 1]),
53 ColumnValue::from_pg_binary(TypeTag::Text, 25, b"Alice".to_vec()),
54 ]))
55 .build();
56
57 let batch = DatabaseEventBatch::with_events(vec![event])
58 .with_source_id("test-source")
59 .with_batch_seq(42);
60
61 let bytes = serialize_batch(&batch).expect("serialize failed");
63 assert!(!bytes.is_empty());
64
65 let recovered = read_batch(&bytes).expect("deserialize failed");
67 assert_eq!(recovered.source_id, batch.source_id);
68 assert_eq!(recovered.batch_seq, batch.batch_seq);
69 assert_eq!(recovered.events.len(), 1);
70
71 let event = &recovered.events[0];
72 assert_eq!(event.schema, "public");
73 assert_eq!(event.table, "users");
74 assert_eq!(event.position, 12345);
75 }
76
77 #[test]
78 fn test_serialize_deserialize_event() {
79 let event = DatabaseEventBuilder::update("myschema", "orders")
80 .position(99999)
81 .new_row(Row::new(vec![
82 ColumnValue::from_pg_binary(TypeTag::Int64, 20, vec![0, 0, 0, 0, 0, 0, 0, 42]),
83 ]))
84 .old_row(Row::new(vec![
85 ColumnValue::from_pg_binary(TypeTag::Int64, 20, vec![0, 0, 0, 0, 0, 0, 0, 41]),
86 ]))
87 .build();
88
89 let bytes = serialize_event(&event).expect("serialize failed");
91 assert!(!bytes.is_empty());
92
93 let recovered = read_event(&bytes).expect("deserialize failed");
95 assert_eq!(recovered.operation, OperationType::Update);
96 assert_eq!(recovered.schema, "myschema");
97 assert_eq!(recovered.table, "orders");
98 assert!(recovered.new_row.is_some());
99 assert!(recovered.old_row.is_some());
100 }
101
102 #[test]
103 fn test_null_values_roundtrip() {
104 let event = DatabaseEventBuilder::insert("public", "test")
105 .new_row(Row::new(vec![
106 ColumnValue::null(),
107 ColumnValue::from_pg_binary(TypeTag::Text, 25, b"not null".to_vec()),
108 ColumnValue::null(),
109 ]))
110 .build();
111
112 let bytes = serialize_event(&event).expect("serialize failed");
113 let recovered = read_event(&bytes).expect("deserialize failed");
114
115 let row = recovered.new_row.expect("new_row should exist");
116 assert!(row.values[0].is_null());
117 assert!(!row.values[1].is_null());
118 assert!(row.values[2].is_null());
119 }
120
121 #[test]
122 fn test_relation_meta_roundtrip() {
123 let meta = RelationMeta {
124 rel_id: 12345,
125 schema: "public".to_string(),
126 table: "users".to_string(),
127 columns: vec![
128 ColumnMeta {
129 name: "id".to_string(),
130 type_oid: 23,
131 type_modifier: -1,
132 is_key: true,
133 },
134 ColumnMeta {
135 name: "name".to_string(),
136 type_oid: 25,
137 type_modifier: -1,
138 is_key: false,
139 },
140 ],
141 replica_identity: ReplicaIdentity::Full,
142 };
143
144 let event = DatabaseEventBuilder::insert("public", "users")
145 .relation_meta(meta.clone())
146 .build();
147
148 let bytes = serialize_event(&event).expect("serialize failed");
149 let recovered = read_event(&bytes).expect("deserialize failed");
150
151 let recovered_meta = recovered.relation_meta.expect("relation_meta should exist");
152 assert_eq!(recovered_meta.rel_id, meta.rel_id);
153 assert_eq!(recovered_meta.schema, meta.schema);
154 assert_eq!(recovered_meta.columns.len(), 2);
155 assert_eq!(recovered_meta.replica_identity, ReplicaIdentity::Full);
156 }
157}