scry_protocol/database_event/
builder.rs1use super::types::*;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6pub struct DatabaseEventBuilder {
8 event: DatabaseEvent,
9}
10
11impl DatabaseEventBuilder {
12 pub fn new(operation: OperationType, schema: impl Into<String>, table: impl Into<String>) -> Self {
14 Self {
15 event: DatabaseEvent {
16 event_id: None,
17 timestamp_us: current_timestamp_us(),
18 operation,
19 schema: schema.into(),
20 table: table.into(),
21 position: 0,
22 transaction_id: 0,
23 new_row: None,
24 old_row: None,
25 columns: Vec::new(),
26 relation_meta: None,
27 ddl_sql: None,
28 ddl_object_type: None,
29 },
30 }
31 }
32
33 pub fn insert(schema: impl Into<String>, table: impl Into<String>) -> Self {
35 Self::new(OperationType::Insert, schema, table)
36 }
37
38 pub fn update(schema: impl Into<String>, table: impl Into<String>) -> Self {
40 Self::new(OperationType::Update, schema, table)
41 }
42
43 pub fn delete(schema: impl Into<String>, table: impl Into<String>) -> Self {
45 Self::new(OperationType::Delete, schema, table)
46 }
47
48 pub fn snapshot_row(schema: impl Into<String>, table: impl Into<String>) -> Self {
50 Self::new(OperationType::SnapshotRow, schema, table)
51 }
52
53 pub fn begin() -> Self {
55 Self::new(OperationType::Begin, "", "")
56 }
57
58 pub fn commit() -> Self {
60 Self::new(OperationType::Commit, "", "")
61 }
62
63 pub fn ddl(sql: impl Into<String>, object_type: impl Into<String>) -> Self {
65 let mut builder = Self::new(OperationType::Ddl, "", "");
66 builder.event.ddl_sql = Some(sql.into());
67 builder.event.ddl_object_type = Some(object_type.into());
68 builder
69 }
70
71 pub fn event_id(mut self, id: impl Into<String>) -> Self {
73 self.event.event_id = Some(id.into());
74 self
75 }
76
77 pub fn timestamp_us(mut self, ts: u64) -> Self {
79 self.event.timestamp_us = ts;
80 self
81 }
82
83 pub fn timestamp(mut self, ts: SystemTime) -> Self {
85 self.event.timestamp_us = systemtime_to_us(ts);
86 self
87 }
88
89 pub fn position(mut self, pos: u64) -> Self {
91 self.event.position = pos;
92 self
93 }
94
95 pub fn transaction_id(mut self, xid: u64) -> Self {
97 self.event.transaction_id = xid;
98 self
99 }
100
101 pub fn new_row(mut self, row: Row) -> Self {
103 self.event.new_row = Some(row);
104 self
105 }
106
107 pub fn old_row(mut self, row: Row) -> Self {
109 self.event.old_row = Some(row);
110 self
111 }
112
113 pub fn columns(mut self, columns: Vec<String>) -> Self {
115 self.event.columns = columns;
116 self
117 }
118
119 pub fn relation_meta(mut self, meta: RelationMeta) -> Self {
121 self.event.relation_meta = Some(meta);
122 self
123 }
124
125 pub fn build(self) -> DatabaseEvent {
127 self.event
128 }
129}
130
131pub struct BatchBuilder {
133 events: Vec<DatabaseEvent>,
134 relations: Vec<RelationMeta>,
135 source_id: Option<String>,
136 batch_seq: u64,
137 current_bytes: usize,
138 max_events: usize,
139 max_bytes: usize,
140}
141
142impl BatchBuilder {
143 pub const DEFAULT_MAX_EVENTS: usize = 1000;
145 pub const DEFAULT_MAX_BYTES: usize = 1_000_000;
147
148 pub fn new() -> Self {
150 Self {
151 events: Vec::new(),
152 relations: Vec::new(),
153 source_id: None,
154 batch_seq: 0,
155 current_bytes: 0,
156 max_events: Self::DEFAULT_MAX_EVENTS,
157 max_bytes: Self::DEFAULT_MAX_BYTES,
158 }
159 }
160
161 pub fn source_id(mut self, id: impl Into<String>) -> Self {
163 self.source_id = Some(id.into());
164 self
165 }
166
167 pub fn batch_seq(mut self, seq: u64) -> Self {
169 self.batch_seq = seq;
170 self
171 }
172
173 pub fn max_events(mut self, max: usize) -> Self {
175 self.max_events = max;
176 self
177 }
178
179 pub fn max_bytes(mut self, max: usize) -> Self {
181 self.max_bytes = max;
182 self
183 }
184
185 pub fn add_event(&mut self, event: DatabaseEvent) -> Option<DatabaseEventBatch> {
188 let event_size = event.size_bytes();
189
190 if !self.events.is_empty()
192 && (self.events.len() >= self.max_events
193 || self.current_bytes + event_size > self.max_bytes)
194 {
195 let batch = self.flush();
197 self.current_bytes = event_size;
199 self.events.push(event);
200 Some(batch)
201 } else {
202 self.current_bytes += event_size;
203 self.events.push(event);
204 None
205 }
206 }
207
208 pub fn add_relation(&mut self, meta: RelationMeta) {
210 if !self.relations.iter().any(|r| r.rel_id == meta.rel_id) {
212 self.relations.push(meta);
213 }
214 }
215
216 pub fn is_empty(&self) -> bool {
218 self.events.is_empty()
219 }
220
221 pub fn len(&self) -> usize {
223 self.events.len()
224 }
225
226 pub fn flush(&mut self) -> DatabaseEventBatch {
228 let batch = DatabaseEventBatch {
229 events: std::mem::take(&mut self.events),
230 source_id: self.source_id.clone(),
231 batch_seq: self.batch_seq,
232 relations: std::mem::take(&mut self.relations),
233 control_directive: None,
234 sequence_values: None,
235 backfill_metadata: None,
236 };
237
238 self.batch_seq += 1;
239 self.current_bytes = 0;
240
241 batch
242 }
243
244 pub fn finish(mut self) -> Option<DatabaseEventBatch> {
246 if self.events.is_empty() {
247 None
248 } else {
249 Some(self.flush())
250 }
251 }
252}
253
254impl Default for BatchBuilder {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260fn current_timestamp_us() -> u64 {
263 systemtime_to_us(SystemTime::now())
264}
265
266fn systemtime_to_us(time: SystemTime) -> u64 {
267 time.duration_since(UNIX_EPOCH)
268 .map(|d| d.as_micros() as u64)
269 .unwrap_or(0)
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_event_builder() {
278 let event = DatabaseEventBuilder::insert("public", "users")
279 .position(12345)
280 .transaction_id(100)
281 .columns(vec!["id".to_string(), "name".to_string()])
282 .new_row(Row::new(vec![
283 ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 1]),
284 ColumnValue::from_pg_binary(TypeTag::Text, 25, b"Alice".to_vec()),
285 ]))
286 .build();
287
288 assert_eq!(event.operation, OperationType::Insert);
289 assert_eq!(event.schema, "public");
290 assert_eq!(event.table, "users");
291 assert_eq!(event.position, 12345);
292 }
293
294 #[test]
295 fn test_batch_builder() {
296 let mut batch = BatchBuilder::new()
297 .source_id("test")
298 .max_events(2);
299
300 let event1 = DatabaseEventBuilder::insert("public", "users").build();
301 let event2 = DatabaseEventBuilder::insert("public", "users").build();
302 let event3 = DatabaseEventBuilder::insert("public", "users").build();
303
304 assert!(batch.add_event(event1).is_none());
306 assert!(batch.add_event(event2).is_none());
307
308 let flushed = batch.add_event(event3);
310 assert!(flushed.is_some());
311 assert_eq!(flushed.unwrap().events.len(), 2);
312
313 let remaining = batch.finish();
315 assert!(remaining.is_some());
316 assert_eq!(remaining.unwrap().events.len(), 1);
317 }
318}