1use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
8#[repr(u8)]
9pub enum TypeTag {
10 Null = 0,
12
13 Bool = 1,
15 Int16 = 2,
16 Int32 = 3,
17 Int64 = 4,
18 Float32 = 5,
19 Float64 = 6,
20 Text = 7,
21 Bytea = 8,
22
23 Json = 9,
25 Jsonb = 10,
26
27 Uuid = 11,
29 Timestamp = 12,
30 TimestampTz = 13,
31 Date = 14,
32 Time = 15,
33 TimeTz = 16,
34 Interval = 17,
35 Numeric = 18,
36
37 Bit = 20,
39 Varbit = 21,
40
41 Point = 30,
43 Line = 31,
44 Lseg = 32,
45 Box = 33,
46 Path = 34,
47 Polygon = 35,
48 Circle = 36,
49
50 Inet = 40,
52 Cidr = 41,
53 MacAddr = 42,
54 MacAddr8 = 43,
55
56 Int4Range = 50,
58 Int8Range = 51,
59 NumRange = 52,
60 TsRange = 53,
61 TsTzRange = 54,
62 DateRange = 55,
63
64 Money = 60,
66
67 Xml = 61,
69
70 Array = 100,
72
73 ToastUnchanged = 126,
78
79 Custom = 127,
81}
82
83impl TypeTag {
84 pub fn from_byte(b: u8) -> Option<Self> {
86 match b {
87 0 => Some(Self::Null),
88 1 => Some(Self::Bool),
89 2 => Some(Self::Int16),
90 3 => Some(Self::Int32),
91 4 => Some(Self::Int64),
92 5 => Some(Self::Float32),
93 6 => Some(Self::Float64),
94 7 => Some(Self::Text),
95 8 => Some(Self::Bytea),
96 9 => Some(Self::Json),
97 10 => Some(Self::Jsonb),
98 11 => Some(Self::Uuid),
99 12 => Some(Self::Timestamp),
100 13 => Some(Self::TimestampTz),
101 14 => Some(Self::Date),
102 15 => Some(Self::Time),
103 16 => Some(Self::TimeTz),
104 17 => Some(Self::Interval),
105 18 => Some(Self::Numeric),
106 20 => Some(Self::Bit),
107 21 => Some(Self::Varbit),
108 30 => Some(Self::Point),
109 31 => Some(Self::Line),
110 32 => Some(Self::Lseg),
111 33 => Some(Self::Box),
112 34 => Some(Self::Path),
113 35 => Some(Self::Polygon),
114 36 => Some(Self::Circle),
115 40 => Some(Self::Inet),
116 41 => Some(Self::Cidr),
117 42 => Some(Self::MacAddr),
118 43 => Some(Self::MacAddr8),
119 50 => Some(Self::Int4Range),
120 51 => Some(Self::Int8Range),
121 52 => Some(Self::NumRange),
122 53 => Some(Self::TsRange),
123 54 => Some(Self::TsTzRange),
124 55 => Some(Self::DateRange),
125 60 => Some(Self::Money),
126 61 => Some(Self::Xml),
127 100 => Some(Self::Array),
128 126 => Some(Self::ToastUnchanged),
129 127 => Some(Self::Custom),
130 _ => None,
131 }
132 }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[repr(u8)]
138pub enum OperationType {
139 Insert = 0,
141 Update = 1,
142 Delete = 2,
143 Truncate = 3,
144
145 Begin = 10,
147 Commit = 11,
148 Rollback = 12,
149
150 SnapshotRow = 20,
152 SnapshotBegin = 21,
153 SnapshotEnd = 22,
154
155 SequenceSync = 30,
157 DisableForeignKeys = 31,
158 EnableForeignKeys = 32,
159
160 Ddl = 33,
162
163 DdlComplete = 34,
165
166 BackfillComplete = 35,
169
170 BackfillStart = 36,
173}
174
175impl OperationType {
176 pub fn from_byte(b: u8) -> Option<Self> {
178 match b {
179 0 => Some(Self::Insert),
180 1 => Some(Self::Update),
181 2 => Some(Self::Delete),
182 3 => Some(Self::Truncate),
183 10 => Some(Self::Begin),
184 11 => Some(Self::Commit),
185 12 => Some(Self::Rollback),
186 20 => Some(Self::SnapshotRow),
187 21 => Some(Self::SnapshotBegin),
188 22 => Some(Self::SnapshotEnd),
189 30 => Some(Self::SequenceSync),
190 31 => Some(Self::DisableForeignKeys),
191 32 => Some(Self::EnableForeignKeys),
192 33 => Some(Self::Ddl),
193 34 => Some(Self::DdlComplete),
194 35 => Some(Self::BackfillComplete),
195 36 => Some(Self::BackfillStart),
196 _ => None,
197 }
198 }
199
200 pub fn is_control_directive(&self) -> bool {
202 matches!(
203 self,
204 Self::SequenceSync | Self::DisableForeignKeys | Self::EnableForeignKeys
205 | Self::BackfillComplete | Self::BackfillStart
206 )
207 }
208
209 pub fn is_ddl(&self) -> bool {
211 matches!(self, Self::Ddl)
212 }
213}
214
215impl std::fmt::Display for OperationType {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 match self {
218 Self::Insert => write!(f, "INSERT"),
219 Self::Update => write!(f, "UPDATE"),
220 Self::Delete => write!(f, "DELETE"),
221 Self::Truncate => write!(f, "TRUNCATE"),
222 Self::Begin => write!(f, "BEGIN"),
223 Self::Commit => write!(f, "COMMIT"),
224 Self::Rollback => write!(f, "ROLLBACK"),
225 Self::SnapshotRow => write!(f, "SNAPSHOT_ROW"),
226 Self::SnapshotBegin => write!(f, "SNAPSHOT_BEGIN"),
227 Self::SnapshotEnd => write!(f, "SNAPSHOT_END"),
228 Self::SequenceSync => write!(f, "SEQUENCE_SYNC"),
229 Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
230 Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
231 Self::Ddl => write!(f, "DDL"),
232 Self::DdlComplete => write!(f, "DDL_COMPLETE"),
233 Self::BackfillComplete => write!(f, "BACKFILL_COMPLETE"),
234 Self::BackfillStart => write!(f, "BACKFILL_START"),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
241#[repr(u8)]
242pub enum ReplicaIdentity {
243 #[default]
244 Default = 0, Nothing = 1, Full = 2, Index = 3, }
249
250impl ReplicaIdentity {
251 pub fn from_pg_char(c: u8) -> Self {
253 match c {
254 b'd' => Self::Default,
255 b'n' => Self::Nothing,
256 b'f' => Self::Full,
257 b'i' => Self::Index,
258 _ => Self::Default,
259 }
260 }
261
262 pub fn to_pg_char(self) -> u8 {
264 match self {
265 Self::Default => b'd',
266 Self::Nothing => b'n',
267 Self::Full => b'f',
268 Self::Index => b'i',
269 }
270 }
271}
272
273#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
278pub struct SequenceValue {
279 pub schema: String,
281
282 pub name: String,
284
285 pub last_value: i64,
287
288 pub is_called: bool,
292
293 pub increment_by: i64,
295
296 pub min_value: i64,
298
299 pub max_value: i64,
301}
302
303impl SequenceValue {
304 pub fn new(
306 schema: impl Into<String>,
307 name: impl Into<String>,
308 last_value: i64,
309 ) -> Self {
310 Self {
311 schema: schema.into(),
312 name: name.into(),
313 last_value,
314 is_called: true,
315 increment_by: 1,
316 min_value: 1,
317 max_value: i64::MAX,
318 }
319 }
320
321 pub fn qualified_name(&self) -> String {
323 format!("{}.{}", self.schema, self.name)
324 }
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
332pub enum ControlDirective {
333 DisableForeignKeys,
339
340 EnableForeignKeys,
346
347 SyncSequences,
350}
351
352impl std::fmt::Display for ControlDirective {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 match self {
355 Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
356 Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
357 Self::SyncSequences => write!(f, "SYNC_SEQUENCES"),
358 }
359 }
360}
361
362#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
365pub struct ColumnValue {
366 pub type_tag: TypeTag,
368
369 pub type_oid: u32,
371
372 pub data: Option<Vec<u8>>,
375}
376
377impl ColumnValue {
378 pub fn null() -> Self {
380 Self {
381 type_tag: TypeTag::Null,
382 type_oid: 0,
383 data: None,
384 }
385 }
386
387 pub fn from_pg_binary(type_tag: TypeTag, type_oid: u32, data: Vec<u8>) -> Self {
389 Self {
390 type_tag,
391 type_oid,
392 data: Some(data),
393 }
394 }
395
396 pub fn unchanged(type_oid: u32) -> Self {
400 Self {
401 type_tag: TypeTag::ToastUnchanged,
402 type_oid,
403 data: None,
404 }
405 }
406
407 pub fn is_null(&self) -> bool {
409 self.data.is_none() && self.type_tag == TypeTag::Null
410 }
411
412 pub fn is_unchanged(&self) -> bool {
414 self.type_tag == TypeTag::ToastUnchanged
415 }
416
417 pub fn as_bytes(&self) -> Option<&[u8]> {
419 self.data.as_deref()
420 }
421
422 pub fn size_bytes(&self) -> usize {
424 5 + self.data.as_ref().map_or(0, |d| d.len())
426 }
427}
428
429#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
431pub struct Row {
432 pub values: Vec<ColumnValue>,
433}
434
435impl Row {
436 pub fn new(values: Vec<ColumnValue>) -> Self {
438 Self { values }
439 }
440
441 pub fn empty() -> Self {
443 Self { values: Vec::new() }
444 }
445
446 pub fn get(&self, index: usize) -> Option<&ColumnValue> {
448 self.values.get(index)
449 }
450
451 pub fn size_bytes(&self) -> usize {
453 self.values.iter().map(|v| v.size_bytes()).sum()
454 }
455}
456
457#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
459pub struct ColumnMeta {
460 pub name: String,
462
463 pub type_oid: u32,
465
466 pub type_modifier: i32,
468
469 pub is_key: bool,
471}
472
473#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
475pub struct RelationMeta {
476 pub rel_id: u32,
478
479 pub schema: String,
481
482 pub table: String,
484
485 pub columns: Vec<ColumnMeta>,
487
488 pub replica_identity: ReplicaIdentity,
490}
491
492impl RelationMeta {
493 pub fn qualified_name(&self) -> String {
495 format!("{}.{}", self.schema, self.table)
496 }
497
498 pub fn column_names(&self) -> Vec<&str> {
500 self.columns.iter().map(|c| c.name.as_str()).collect()
501 }
502}
503
504#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
508pub struct BackfillMetadata {
509 pub table_count: u32,
511
512 pub estimated_total_rows: u64,
514}
515
516#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DatabaseEvent {
519 pub event_id: Option<String>,
521
522 pub timestamp_us: u64,
524
525 pub operation: OperationType,
527
528 pub schema: String,
530
531 pub table: String,
533
534 pub position: u64,
536
537 pub transaction_id: u64,
539
540 pub new_row: Option<Row>,
542
543 pub old_row: Option<Row>,
545
546 pub columns: Vec<String>,
548
549 pub relation_meta: Option<RelationMeta>,
551
552 #[serde(default, skip_serializing_if = "Option::is_none")]
555 pub ddl_sql: Option<String>,
556
557 #[serde(default, skip_serializing_if = "Option::is_none")]
560 pub ddl_object_type: Option<String>,
561}
562
563impl DatabaseEvent {
564 pub fn qualified_name(&self) -> String {
566 format!("{}.{}", self.schema, self.table)
567 }
568
569 pub fn size_bytes(&self) -> usize {
571 let mut size = 64; size += self.schema.len();
573 size += self.table.len();
574 if let Some(ref row) = self.new_row {
575 size += row.size_bytes();
576 }
577 if let Some(ref row) = self.old_row {
578 size += row.size_bytes();
579 }
580 size += self.columns.iter().map(|c| c.len()).sum::<usize>();
581 size
582 }
583}
584
585#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
587pub struct DatabaseEventBatch {
588 pub events: Vec<DatabaseEvent>,
590
591 pub source_id: Option<String>,
593
594 pub batch_seq: u64,
596
597 pub relations: Vec<RelationMeta>,
599
600 #[serde(default, skip_serializing_if = "Option::is_none")]
604 pub control_directive: Option<ControlDirective>,
605
606 #[serde(default, skip_serializing_if = "Option::is_none")]
609 pub sequence_values: Option<Vec<SequenceValue>>,
610
611 #[serde(default, skip_serializing_if = "Option::is_none")]
614 pub backfill_metadata: Option<BackfillMetadata>,
615}
616
617impl DatabaseEventBatch {
618 pub fn new() -> Self {
620 Self {
621 events: Vec::new(),
622 source_id: None,
623 batch_seq: 0,
624 relations: Vec::new(),
625 control_directive: None,
626 sequence_values: None,
627 backfill_metadata: None,
628 }
629 }
630
631 pub fn with_events(events: Vec<DatabaseEvent>) -> Self {
633 Self {
634 events,
635 source_id: None,
636 batch_seq: 0,
637 relations: Vec::new(),
638 control_directive: None,
639 sequence_values: None,
640 backfill_metadata: None,
641 }
642 }
643
644 pub fn control(directive: ControlDirective) -> Self {
646 Self {
647 events: Vec::new(),
648 source_id: None,
649 batch_seq: 0,
650 relations: Vec::new(),
651 control_directive: Some(directive),
652 sequence_values: None,
653 backfill_metadata: None,
654 }
655 }
656
657 pub fn sequence_sync(sequences: Vec<SequenceValue>) -> Self {
659 Self {
660 events: Vec::new(),
661 source_id: None,
662 batch_seq: 0,
663 relations: Vec::new(),
664 control_directive: Some(ControlDirective::SyncSequences),
665 sequence_values: Some(sequences),
666 backfill_metadata: None,
667 }
668 }
669
670 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
672 self.source_id = Some(source_id.into());
673 self
674 }
675
676 pub fn with_batch_seq(mut self, seq: u64) -> Self {
678 self.batch_seq = seq;
679 self
680 }
681
682 pub fn with_control_directive(mut self, directive: ControlDirective) -> Self {
684 self.control_directive = Some(directive);
685 self
686 }
687
688 pub fn with_sequence_values(mut self, sequences: Vec<SequenceValue>) -> Self {
690 self.sequence_values = Some(sequences);
691 self
692 }
693
694 pub fn with_backfill_metadata(mut self, metadata: BackfillMetadata) -> Self {
696 self.backfill_metadata = Some(metadata);
697 self
698 }
699
700 pub fn is_control_batch(&self) -> bool {
702 self.control_directive.is_some()
703 }
704
705 pub fn size_bytes(&self) -> usize {
707 let mut size = 32; size += self.events.iter().map(|e| e.size_bytes()).sum::<usize>();
709 size
710 }
711}
712
713impl Default for DatabaseEventBatch {
714 fn default() -> Self {
715 Self::new()
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_type_tag_roundtrip() {
725 for tag in [
726 TypeTag::Null,
727 TypeTag::Bool,
728 TypeTag::Int32,
729 TypeTag::Text,
730 TypeTag::Uuid,
731 TypeTag::Array,
732 TypeTag::ToastUnchanged,
733 TypeTag::Custom,
734 ] {
735 let byte = tag as u8;
736 let recovered = TypeTag::from_byte(byte).unwrap();
737 assert_eq!(tag, recovered);
738 }
739 }
740
741 #[test]
742 fn test_operation_type_display() {
743 assert_eq!(OperationType::Insert.to_string(), "INSERT");
744 assert_eq!(OperationType::Update.to_string(), "UPDATE");
745 assert_eq!(OperationType::Delete.to_string(), "DELETE");
746 assert_eq!(OperationType::Begin.to_string(), "BEGIN");
747 assert_eq!(OperationType::Commit.to_string(), "COMMIT");
748 assert_eq!(OperationType::SnapshotRow.to_string(), "SNAPSHOT_ROW");
749 }
750
751 #[test]
752 fn test_replica_identity_pg_char() {
753 assert_eq!(ReplicaIdentity::from_pg_char(b'd'), ReplicaIdentity::Default);
754 assert_eq!(ReplicaIdentity::from_pg_char(b'f'), ReplicaIdentity::Full);
755 assert_eq!(ReplicaIdentity::Full.to_pg_char(), b'f');
756 }
757
758 #[test]
759 fn test_column_value_null() {
760 let null = ColumnValue::null();
761 assert!(null.is_null());
762 assert!(!null.is_unchanged());
763 assert_eq!(null.as_bytes(), None);
764 }
765
766 #[test]
767 fn test_column_value_unchanged() {
768 let unchanged = ColumnValue::unchanged(25); assert!(!unchanged.is_null());
770 assert!(unchanged.is_unchanged());
771 assert_eq!(unchanged.type_oid, 25);
772 assert_eq!(unchanged.as_bytes(), None);
773 }
774
775 #[test]
776 fn test_column_value_with_data() {
777 let data = vec![0x00, 0x01, 0x02, 0x03];
778 let value = ColumnValue::from_pg_binary(TypeTag::Int32, 23, data.clone());
779 assert!(!value.is_null());
780 assert_eq!(value.as_bytes(), Some(data.as_slice()));
781 }
782
783 #[test]
784 fn test_row_size_bytes() {
785 let row = Row::new(vec![
786 ColumnValue::null(),
787 ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 42]),
788 ]);
789 assert!(row.size_bytes() > 0);
790 }
791
792 #[test]
793 fn test_database_event_batch() {
794 let batch = DatabaseEventBatch::new()
795 .with_source_id("test-source")
796 .with_batch_seq(42);
797
798 assert_eq!(batch.source_id, Some("test-source".to_string()));
799 assert_eq!(batch.batch_seq, 42);
800 assert!(!batch.is_control_batch());
801 }
802
803 #[test]
804 fn test_operation_type_control_directives() {
805 assert_eq!(OperationType::SequenceSync.to_string(), "SEQUENCE_SYNC");
807 assert_eq!(OperationType::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
808 assert_eq!(OperationType::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
809
810 assert_eq!(OperationType::from_byte(30), Some(OperationType::SequenceSync));
812 assert_eq!(OperationType::from_byte(31), Some(OperationType::DisableForeignKeys));
813 assert_eq!(OperationType::from_byte(32), Some(OperationType::EnableForeignKeys));
814
815 assert!(OperationType::SequenceSync.is_control_directive());
817 assert!(OperationType::DisableForeignKeys.is_control_directive());
818 assert!(OperationType::EnableForeignKeys.is_control_directive());
819 assert!(!OperationType::Insert.is_control_directive());
820 assert!(!OperationType::SnapshotRow.is_control_directive());
821 }
822
823 #[test]
824 fn test_sequence_value() {
825 let seq = SequenceValue::new("public", "users_id_seq", 100);
826 assert_eq!(seq.schema, "public");
827 assert_eq!(seq.name, "users_id_seq");
828 assert_eq!(seq.last_value, 100);
829 assert!(seq.is_called);
830 assert_eq!(seq.increment_by, 1);
831 assert_eq!(seq.qualified_name(), "public.users_id_seq");
832 }
833
834 #[test]
835 fn test_control_directive_display() {
836 assert_eq!(ControlDirective::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
837 assert_eq!(ControlDirective::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
838 assert_eq!(ControlDirective::SyncSequences.to_string(), "SYNC_SEQUENCES");
839 }
840
841 #[test]
842 fn test_control_batch() {
843 let batch = DatabaseEventBatch::control(ControlDirective::DisableForeignKeys)
844 .with_source_id("backfill-001")
845 .with_batch_seq(1);
846
847 assert!(batch.is_control_batch());
848 assert_eq!(batch.control_directive, Some(ControlDirective::DisableForeignKeys));
849 assert!(batch.events.is_empty());
850 assert!(batch.sequence_values.is_none());
851 }
852
853 #[test]
854 fn test_sequence_sync_batch() {
855 let sequences = vec![
856 SequenceValue::new("public", "users_id_seq", 100),
857 SequenceValue::new("public", "orders_id_seq", 500),
858 ];
859
860 let batch = DatabaseEventBatch::sequence_sync(sequences)
861 .with_source_id("backfill-001")
862 .with_batch_seq(99);
863
864 assert!(batch.is_control_batch());
865 assert_eq!(batch.control_directive, Some(ControlDirective::SyncSequences));
866 assert!(batch.events.is_empty());
867
868 let seq_values = batch.sequence_values.as_ref().unwrap();
869 assert_eq!(seq_values.len(), 2);
870 assert_eq!(seq_values[0].qualified_name(), "public.users_id_seq");
871 assert_eq!(seq_values[1].last_value, 500);
872 }
873}