Skip to main content

scry_protocol/database_event/
builder.rs

1//! Builders for constructing database events.
2
3use super::types::*;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// Builder for constructing a single DatabaseEvent.
7pub struct DatabaseEventBuilder {
8    event: DatabaseEvent,
9}
10
11impl DatabaseEventBuilder {
12    /// Create a new builder for a DML operation.
13    pub fn new(operation: OperationType, schema: impl Into<String>, table: impl Into<String>) -> Self {
14        Self {
15            event: DatabaseEvent {
16                event_id: None,
17                timestamp_us: current_timestamp_us(),
18                operation,
19                schema: schema.into(),
20                table: table.into(),
21                position: 0,
22                transaction_id: 0,
23                new_row: None,
24                old_row: None,
25                columns: Vec::new(),
26                relation_meta: None,
27                ddl_sql: None,
28                ddl_object_type: None,
29            },
30        }
31    }
32
33    /// Create a builder for an INSERT event.
34    pub fn insert(schema: impl Into<String>, table: impl Into<String>) -> Self {
35        Self::new(OperationType::Insert, schema, table)
36    }
37
38    /// Create a builder for an UPDATE event.
39    pub fn update(schema: impl Into<String>, table: impl Into<String>) -> Self {
40        Self::new(OperationType::Update, schema, table)
41    }
42
43    /// Create a builder for a DELETE event.
44    pub fn delete(schema: impl Into<String>, table: impl Into<String>) -> Self {
45        Self::new(OperationType::Delete, schema, table)
46    }
47
48    /// Create a builder for a snapshot row.
49    pub fn snapshot_row(schema: impl Into<String>, table: impl Into<String>) -> Self {
50        Self::new(OperationType::SnapshotRow, schema, table)
51    }
52
53    /// Create a builder for BEGIN transaction.
54    pub fn begin() -> Self {
55        Self::new(OperationType::Begin, "", "")
56    }
57
58    /// Create a builder for COMMIT transaction.
59    pub fn commit() -> Self {
60        Self::new(OperationType::Commit, "", "")
61    }
62
63    /// Create a builder for a DDL event.
64    pub fn ddl(sql: impl Into<String>, object_type: impl Into<String>) -> Self {
65        let mut builder = Self::new(OperationType::Ddl, "", "");
66        builder.event.ddl_sql = Some(sql.into());
67        builder.event.ddl_object_type = Some(object_type.into());
68        builder
69    }
70
71    /// Set the event ID.
72    pub fn event_id(mut self, id: impl Into<String>) -> Self {
73        self.event.event_id = Some(id.into());
74        self
75    }
76
77    /// Set the timestamp in microseconds.
78    pub fn timestamp_us(mut self, ts: u64) -> Self {
79        self.event.timestamp_us = ts;
80        self
81    }
82
83    /// Set the timestamp from SystemTime.
84    pub fn timestamp(mut self, ts: SystemTime) -> Self {
85        self.event.timestamp_us = systemtime_to_us(ts);
86        self
87    }
88
89    /// Set the replication position (LSN).
90    pub fn position(mut self, pos: u64) -> Self {
91        self.event.position = pos;
92        self
93    }
94
95    /// Set the transaction ID.
96    pub fn transaction_id(mut self, xid: u64) -> Self {
97        self.event.transaction_id = xid;
98        self
99    }
100
101    /// Set the new row data.
102    pub fn new_row(mut self, row: Row) -> Self {
103        self.event.new_row = Some(row);
104        self
105    }
106
107    /// Set the old row data.
108    pub fn old_row(mut self, row: Row) -> Self {
109        self.event.old_row = Some(row);
110        self
111    }
112
113    /// Set column names.
114    pub fn columns(mut self, columns: Vec<String>) -> Self {
115        self.event.columns = columns;
116        self
117    }
118
119    /// Set relation metadata.
120    pub fn relation_meta(mut self, meta: RelationMeta) -> Self {
121        self.event.relation_meta = Some(meta);
122        self
123    }
124
125    /// Build the event.
126    pub fn build(self) -> DatabaseEvent {
127        self.event
128    }
129}
130
131/// Builder for constructing batches of events with efficient serialization.
132pub struct BatchBuilder {
133    events: Vec<DatabaseEvent>,
134    relations: Vec<RelationMeta>,
135    source_id: Option<String>,
136    batch_seq: u64,
137    current_bytes: usize,
138    max_events: usize,
139    max_bytes: usize,
140}
141
142impl BatchBuilder {
143    /// Default maximum events per batch.
144    pub const DEFAULT_MAX_EVENTS: usize = 1000;
145    /// Default maximum bytes per batch (1MB).
146    pub const DEFAULT_MAX_BYTES: usize = 1_000_000;
147
148    /// Create a new batch builder with default limits.
149    pub fn new() -> Self {
150        Self {
151            events: Vec::new(),
152            relations: Vec::new(),
153            source_id: None,
154            batch_seq: 0,
155            current_bytes: 0,
156            max_events: Self::DEFAULT_MAX_EVENTS,
157            max_bytes: Self::DEFAULT_MAX_BYTES,
158        }
159    }
160
161    /// Set the source ID.
162    pub fn source_id(mut self, id: impl Into<String>) -> Self {
163        self.source_id = Some(id.into());
164        self
165    }
166
167    /// Set the batch sequence number.
168    pub fn batch_seq(mut self, seq: u64) -> Self {
169        self.batch_seq = seq;
170        self
171    }
172
173    /// Set the maximum number of events per batch.
174    pub fn max_events(mut self, max: usize) -> Self {
175        self.max_events = max;
176        self
177    }
178
179    /// Set the maximum bytes per batch.
180    pub fn max_bytes(mut self, max: usize) -> Self {
181        self.max_bytes = max;
182        self
183    }
184
185    /// Add an event to the batch.
186    /// Returns Some(batch) if the batch is full and was flushed.
187    pub fn add_event(&mut self, event: DatabaseEvent) -> Option<DatabaseEventBatch> {
188        let event_size = event.size_bytes();
189
190        // Check if adding this event would exceed limits
191        if !self.events.is_empty()
192            && (self.events.len() >= self.max_events
193                || self.current_bytes + event_size > self.max_bytes)
194        {
195            // Flush current batch
196            let batch = self.flush();
197            // Add the event to the new batch
198            self.current_bytes = event_size;
199            self.events.push(event);
200            Some(batch)
201        } else {
202            self.current_bytes += event_size;
203            self.events.push(event);
204            None
205        }
206    }
207
208    /// Add relation metadata to the batch.
209    pub fn add_relation(&mut self, meta: RelationMeta) {
210        // Only add if not already present
211        if !self.relations.iter().any(|r| r.rel_id == meta.rel_id) {
212            self.relations.push(meta);
213        }
214    }
215
216    /// Check if the batch is empty.
217    pub fn is_empty(&self) -> bool {
218        self.events.is_empty()
219    }
220
221    /// Get the current number of events.
222    pub fn len(&self) -> usize {
223        self.events.len()
224    }
225
226    /// Flush the current batch and return it.
227    pub fn flush(&mut self) -> DatabaseEventBatch {
228        let batch = DatabaseEventBatch {
229            events: std::mem::take(&mut self.events),
230            source_id: self.source_id.clone(),
231            batch_seq: self.batch_seq,
232            relations: std::mem::take(&mut self.relations),
233            control_directive: None,
234            sequence_values: None,
235            backfill_metadata: None,
236        };
237
238        self.batch_seq += 1;
239        self.current_bytes = 0;
240
241        batch
242    }
243
244    /// Finish building and return any remaining events as a batch.
245    pub fn finish(mut self) -> Option<DatabaseEventBatch> {
246        if self.events.is_empty() {
247            None
248        } else {
249            Some(self.flush())
250        }
251    }
252}
253
254impl Default for BatchBuilder {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260// Utility functions
261
262fn current_timestamp_us() -> u64 {
263    systemtime_to_us(SystemTime::now())
264}
265
266fn systemtime_to_us(time: SystemTime) -> u64 {
267    time.duration_since(UNIX_EPOCH)
268        .map(|d| d.as_micros() as u64)
269        .unwrap_or(0)
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_event_builder() {
278        let event = DatabaseEventBuilder::insert("public", "users")
279            .position(12345)
280            .transaction_id(100)
281            .columns(vec!["id".to_string(), "name".to_string()])
282            .new_row(Row::new(vec![
283                ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 1]),
284                ColumnValue::from_pg_binary(TypeTag::Text, 25, b"Alice".to_vec()),
285            ]))
286            .build();
287
288        assert_eq!(event.operation, OperationType::Insert);
289        assert_eq!(event.schema, "public");
290        assert_eq!(event.table, "users");
291        assert_eq!(event.position, 12345);
292    }
293
294    #[test]
295    fn test_batch_builder() {
296        let mut batch = BatchBuilder::new()
297            .source_id("test")
298            .max_events(2);
299
300        let event1 = DatabaseEventBuilder::insert("public", "users").build();
301        let event2 = DatabaseEventBuilder::insert("public", "users").build();
302        let event3 = DatabaseEventBuilder::insert("public", "users").build();
303
304        // First two events should not trigger flush
305        assert!(batch.add_event(event1).is_none());
306        assert!(batch.add_event(event2).is_none());
307
308        // Third event should trigger flush
309        let flushed = batch.add_event(event3);
310        assert!(flushed.is_some());
311        assert_eq!(flushed.unwrap().events.len(), 2);
312
313        // Finish should return remaining event
314        let remaining = batch.finish();
315        assert!(remaining.is_some());
316        assert_eq!(remaining.unwrap().events.len(), 1);
317    }
318}