1use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
8#[repr(u8)]
9pub enum TypeTag {
10 Null = 0,
12
13 Bool = 1,
15 Int16 = 2,
16 Int32 = 3,
17 Int64 = 4,
18 Float32 = 5,
19 Float64 = 6,
20 Text = 7,
21 Bytea = 8,
22
23 Json = 9,
25 Jsonb = 10,
26
27 Uuid = 11,
29 Timestamp = 12,
30 TimestampTz = 13,
31 Date = 14,
32 Time = 15,
33 TimeTz = 16,
34 Interval = 17,
35 Numeric = 18,
36
37 Bit = 20,
39 Varbit = 21,
40
41 Point = 30,
43 Line = 31,
44 Lseg = 32,
45 Box = 33,
46 Path = 34,
47 Polygon = 35,
48 Circle = 36,
49
50 Inet = 40,
52 Cidr = 41,
53 MacAddr = 42,
54 MacAddr8 = 43,
55
56 Int4Range = 50,
58 Int8Range = 51,
59 NumRange = 52,
60 TsRange = 53,
61 TsTzRange = 54,
62 DateRange = 55,
63
64 Money = 60,
66
67 Xml = 61,
69
70 Array = 100,
72
73 ToastUnchanged = 126,
78
79 Custom = 127,
81}
82
83impl TypeTag {
84 pub fn from_byte(b: u8) -> Option<Self> {
86 match b {
87 0 => Some(Self::Null),
88 1 => Some(Self::Bool),
89 2 => Some(Self::Int16),
90 3 => Some(Self::Int32),
91 4 => Some(Self::Int64),
92 5 => Some(Self::Float32),
93 6 => Some(Self::Float64),
94 7 => Some(Self::Text),
95 8 => Some(Self::Bytea),
96 9 => Some(Self::Json),
97 10 => Some(Self::Jsonb),
98 11 => Some(Self::Uuid),
99 12 => Some(Self::Timestamp),
100 13 => Some(Self::TimestampTz),
101 14 => Some(Self::Date),
102 15 => Some(Self::Time),
103 16 => Some(Self::TimeTz),
104 17 => Some(Self::Interval),
105 18 => Some(Self::Numeric),
106 20 => Some(Self::Bit),
107 21 => Some(Self::Varbit),
108 30 => Some(Self::Point),
109 31 => Some(Self::Line),
110 32 => Some(Self::Lseg),
111 33 => Some(Self::Box),
112 34 => Some(Self::Path),
113 35 => Some(Self::Polygon),
114 36 => Some(Self::Circle),
115 40 => Some(Self::Inet),
116 41 => Some(Self::Cidr),
117 42 => Some(Self::MacAddr),
118 43 => Some(Self::MacAddr8),
119 50 => Some(Self::Int4Range),
120 51 => Some(Self::Int8Range),
121 52 => Some(Self::NumRange),
122 53 => Some(Self::TsRange),
123 54 => Some(Self::TsTzRange),
124 55 => Some(Self::DateRange),
125 60 => Some(Self::Money),
126 61 => Some(Self::Xml),
127 100 => Some(Self::Array),
128 126 => Some(Self::ToastUnchanged),
129 127 => Some(Self::Custom),
130 _ => None,
131 }
132 }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[repr(u8)]
138pub enum OperationType {
139 Insert = 0,
141 Update = 1,
142 Delete = 2,
143 Truncate = 3,
144
145 Begin = 10,
147 Commit = 11,
148 Rollback = 12,
149
150 SnapshotRow = 20,
152 SnapshotBegin = 21,
153 SnapshotEnd = 22,
154
155 SequenceSync = 30,
157 DisableForeignKeys = 31,
158 EnableForeignKeys = 32,
159
160 Ddl = 33,
162
163 DdlComplete = 34,
165
166 BackfillComplete = 35,
169}
170
171impl OperationType {
172 pub fn from_byte(b: u8) -> Option<Self> {
174 match b {
175 0 => Some(Self::Insert),
176 1 => Some(Self::Update),
177 2 => Some(Self::Delete),
178 3 => Some(Self::Truncate),
179 10 => Some(Self::Begin),
180 11 => Some(Self::Commit),
181 12 => Some(Self::Rollback),
182 20 => Some(Self::SnapshotRow),
183 21 => Some(Self::SnapshotBegin),
184 22 => Some(Self::SnapshotEnd),
185 30 => Some(Self::SequenceSync),
186 31 => Some(Self::DisableForeignKeys),
187 32 => Some(Self::EnableForeignKeys),
188 33 => Some(Self::Ddl),
189 34 => Some(Self::DdlComplete),
190 35 => Some(Self::BackfillComplete),
191 _ => None,
192 }
193 }
194
195 pub fn is_control_directive(&self) -> bool {
197 matches!(
198 self,
199 Self::SequenceSync | Self::DisableForeignKeys | Self::EnableForeignKeys | Self::BackfillComplete
200 )
201 }
202
203 pub fn is_ddl(&self) -> bool {
205 matches!(self, Self::Ddl)
206 }
207}
208
209impl std::fmt::Display for OperationType {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 match self {
212 Self::Insert => write!(f, "INSERT"),
213 Self::Update => write!(f, "UPDATE"),
214 Self::Delete => write!(f, "DELETE"),
215 Self::Truncate => write!(f, "TRUNCATE"),
216 Self::Begin => write!(f, "BEGIN"),
217 Self::Commit => write!(f, "COMMIT"),
218 Self::Rollback => write!(f, "ROLLBACK"),
219 Self::SnapshotRow => write!(f, "SNAPSHOT_ROW"),
220 Self::SnapshotBegin => write!(f, "SNAPSHOT_BEGIN"),
221 Self::SnapshotEnd => write!(f, "SNAPSHOT_END"),
222 Self::SequenceSync => write!(f, "SEQUENCE_SYNC"),
223 Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
224 Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
225 Self::Ddl => write!(f, "DDL"),
226 Self::DdlComplete => write!(f, "DDL_COMPLETE"),
227 Self::BackfillComplete => write!(f, "BACKFILL_COMPLETE"),
228 }
229 }
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
234#[repr(u8)]
235pub enum ReplicaIdentity {
236 #[default]
237 Default = 0, Nothing = 1, Full = 2, Index = 3, }
242
243impl ReplicaIdentity {
244 pub fn from_pg_char(c: u8) -> Self {
246 match c {
247 b'd' => Self::Default,
248 b'n' => Self::Nothing,
249 b'f' => Self::Full,
250 b'i' => Self::Index,
251 _ => Self::Default,
252 }
253 }
254
255 pub fn to_pg_char(self) -> u8 {
257 match self {
258 Self::Default => b'd',
259 Self::Nothing => b'n',
260 Self::Full => b'f',
261 Self::Index => b'i',
262 }
263 }
264}
265
266#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
271pub struct SequenceValue {
272 pub schema: String,
274
275 pub name: String,
277
278 pub last_value: i64,
280
281 pub is_called: bool,
285
286 pub increment_by: i64,
288
289 pub min_value: i64,
291
292 pub max_value: i64,
294}
295
296impl SequenceValue {
297 pub fn new(
299 schema: impl Into<String>,
300 name: impl Into<String>,
301 last_value: i64,
302 ) -> Self {
303 Self {
304 schema: schema.into(),
305 name: name.into(),
306 last_value,
307 is_called: true,
308 increment_by: 1,
309 min_value: 1,
310 max_value: i64::MAX,
311 }
312 }
313
314 pub fn qualified_name(&self) -> String {
316 format!("{}.{}", self.schema, self.name)
317 }
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
325pub enum ControlDirective {
326 DisableForeignKeys,
332
333 EnableForeignKeys,
339
340 SyncSequences,
343}
344
345impl std::fmt::Display for ControlDirective {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 match self {
348 Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
349 Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
350 Self::SyncSequences => write!(f, "SYNC_SEQUENCES"),
351 }
352 }
353}
354
355#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
358pub struct ColumnValue {
359 pub type_tag: TypeTag,
361
362 pub type_oid: u32,
364
365 pub data: Option<Vec<u8>>,
368}
369
370impl ColumnValue {
371 pub fn null() -> Self {
373 Self {
374 type_tag: TypeTag::Null,
375 type_oid: 0,
376 data: None,
377 }
378 }
379
380 pub fn from_pg_binary(type_tag: TypeTag, type_oid: u32, data: Vec<u8>) -> Self {
382 Self {
383 type_tag,
384 type_oid,
385 data: Some(data),
386 }
387 }
388
389 pub fn unchanged(type_oid: u32) -> Self {
393 Self {
394 type_tag: TypeTag::ToastUnchanged,
395 type_oid,
396 data: None,
397 }
398 }
399
400 pub fn is_null(&self) -> bool {
402 self.data.is_none() && self.type_tag == TypeTag::Null
403 }
404
405 pub fn is_unchanged(&self) -> bool {
407 self.type_tag == TypeTag::ToastUnchanged
408 }
409
410 pub fn as_bytes(&self) -> Option<&[u8]> {
412 self.data.as_deref()
413 }
414
415 pub fn size_bytes(&self) -> usize {
417 5 + self.data.as_ref().map_or(0, |d| d.len())
419 }
420}
421
422#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
424pub struct Row {
425 pub values: Vec<ColumnValue>,
426}
427
428impl Row {
429 pub fn new(values: Vec<ColumnValue>) -> Self {
431 Self { values }
432 }
433
434 pub fn empty() -> Self {
436 Self { values: Vec::new() }
437 }
438
439 pub fn get(&self, index: usize) -> Option<&ColumnValue> {
441 self.values.get(index)
442 }
443
444 pub fn size_bytes(&self) -> usize {
446 self.values.iter().map(|v| v.size_bytes()).sum()
447 }
448}
449
450#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
452pub struct ColumnMeta {
453 pub name: String,
455
456 pub type_oid: u32,
458
459 pub type_modifier: i32,
461
462 pub is_key: bool,
464}
465
466#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
468pub struct RelationMeta {
469 pub rel_id: u32,
471
472 pub schema: String,
474
475 pub table: String,
477
478 pub columns: Vec<ColumnMeta>,
480
481 pub replica_identity: ReplicaIdentity,
483}
484
485impl RelationMeta {
486 pub fn qualified_name(&self) -> String {
488 format!("{}.{}", self.schema, self.table)
489 }
490
491 pub fn column_names(&self) -> Vec<&str> {
493 self.columns.iter().map(|c| c.name.as_str()).collect()
494 }
495}
496
497#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
499pub struct DatabaseEvent {
500 pub event_id: Option<String>,
502
503 pub timestamp_us: u64,
505
506 pub operation: OperationType,
508
509 pub schema: String,
511
512 pub table: String,
514
515 pub position: u64,
517
518 pub transaction_id: u64,
520
521 pub new_row: Option<Row>,
523
524 pub old_row: Option<Row>,
526
527 pub columns: Vec<String>,
529
530 pub relation_meta: Option<RelationMeta>,
532
533 #[serde(default, skip_serializing_if = "Option::is_none")]
536 pub ddl_sql: Option<String>,
537
538 #[serde(default, skip_serializing_if = "Option::is_none")]
541 pub ddl_object_type: Option<String>,
542}
543
544impl DatabaseEvent {
545 pub fn qualified_name(&self) -> String {
547 format!("{}.{}", self.schema, self.table)
548 }
549
550 pub fn size_bytes(&self) -> usize {
552 let mut size = 64; size += self.schema.len();
554 size += self.table.len();
555 if let Some(ref row) = self.new_row {
556 size += row.size_bytes();
557 }
558 if let Some(ref row) = self.old_row {
559 size += row.size_bytes();
560 }
561 size += self.columns.iter().map(|c| c.len()).sum::<usize>();
562 size
563 }
564}
565
566#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct DatabaseEventBatch {
569 pub events: Vec<DatabaseEvent>,
571
572 pub source_id: Option<String>,
574
575 pub batch_seq: u64,
577
578 pub relations: Vec<RelationMeta>,
580
581 #[serde(default, skip_serializing_if = "Option::is_none")]
585 pub control_directive: Option<ControlDirective>,
586
587 #[serde(default, skip_serializing_if = "Option::is_none")]
590 pub sequence_values: Option<Vec<SequenceValue>>,
591}
592
593impl DatabaseEventBatch {
594 pub fn new() -> Self {
596 Self {
597 events: Vec::new(),
598 source_id: None,
599 batch_seq: 0,
600 relations: Vec::new(),
601 control_directive: None,
602 sequence_values: None,
603 }
604 }
605
606 pub fn with_events(events: Vec<DatabaseEvent>) -> Self {
608 Self {
609 events,
610 source_id: None,
611 batch_seq: 0,
612 relations: Vec::new(),
613 control_directive: None,
614 sequence_values: None,
615 }
616 }
617
618 pub fn control(directive: ControlDirective) -> Self {
620 Self {
621 events: Vec::new(),
622 source_id: None,
623 batch_seq: 0,
624 relations: Vec::new(),
625 control_directive: Some(directive),
626 sequence_values: None,
627 }
628 }
629
630 pub fn sequence_sync(sequences: Vec<SequenceValue>) -> Self {
632 Self {
633 events: Vec::new(),
634 source_id: None,
635 batch_seq: 0,
636 relations: Vec::new(),
637 control_directive: Some(ControlDirective::SyncSequences),
638 sequence_values: Some(sequences),
639 }
640 }
641
642 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
644 self.source_id = Some(source_id.into());
645 self
646 }
647
648 pub fn with_batch_seq(mut self, seq: u64) -> Self {
650 self.batch_seq = seq;
651 self
652 }
653
654 pub fn with_control_directive(mut self, directive: ControlDirective) -> Self {
656 self.control_directive = Some(directive);
657 self
658 }
659
660 pub fn with_sequence_values(mut self, sequences: Vec<SequenceValue>) -> Self {
662 self.sequence_values = Some(sequences);
663 self
664 }
665
666 pub fn is_control_batch(&self) -> bool {
668 self.control_directive.is_some()
669 }
670
671 pub fn size_bytes(&self) -> usize {
673 let mut size = 32; size += self.events.iter().map(|e| e.size_bytes()).sum::<usize>();
675 size
676 }
677}
678
679impl Default for DatabaseEventBatch {
680 fn default() -> Self {
681 Self::new()
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688
689 #[test]
690 fn test_type_tag_roundtrip() {
691 for tag in [
692 TypeTag::Null,
693 TypeTag::Bool,
694 TypeTag::Int32,
695 TypeTag::Text,
696 TypeTag::Uuid,
697 TypeTag::Array,
698 TypeTag::ToastUnchanged,
699 TypeTag::Custom,
700 ] {
701 let byte = tag as u8;
702 let recovered = TypeTag::from_byte(byte).unwrap();
703 assert_eq!(tag, recovered);
704 }
705 }
706
707 #[test]
708 fn test_operation_type_display() {
709 assert_eq!(OperationType::Insert.to_string(), "INSERT");
710 assert_eq!(OperationType::Update.to_string(), "UPDATE");
711 assert_eq!(OperationType::Delete.to_string(), "DELETE");
712 assert_eq!(OperationType::Begin.to_string(), "BEGIN");
713 assert_eq!(OperationType::Commit.to_string(), "COMMIT");
714 assert_eq!(OperationType::SnapshotRow.to_string(), "SNAPSHOT_ROW");
715 }
716
717 #[test]
718 fn test_replica_identity_pg_char() {
719 assert_eq!(ReplicaIdentity::from_pg_char(b'd'), ReplicaIdentity::Default);
720 assert_eq!(ReplicaIdentity::from_pg_char(b'f'), ReplicaIdentity::Full);
721 assert_eq!(ReplicaIdentity::Full.to_pg_char(), b'f');
722 }
723
724 #[test]
725 fn test_column_value_null() {
726 let null = ColumnValue::null();
727 assert!(null.is_null());
728 assert!(!null.is_unchanged());
729 assert_eq!(null.as_bytes(), None);
730 }
731
732 #[test]
733 fn test_column_value_unchanged() {
734 let unchanged = ColumnValue::unchanged(25); assert!(!unchanged.is_null());
736 assert!(unchanged.is_unchanged());
737 assert_eq!(unchanged.type_oid, 25);
738 assert_eq!(unchanged.as_bytes(), None);
739 }
740
741 #[test]
742 fn test_column_value_with_data() {
743 let data = vec![0x00, 0x01, 0x02, 0x03];
744 let value = ColumnValue::from_pg_binary(TypeTag::Int32, 23, data.clone());
745 assert!(!value.is_null());
746 assert_eq!(value.as_bytes(), Some(data.as_slice()));
747 }
748
749 #[test]
750 fn test_row_size_bytes() {
751 let row = Row::new(vec![
752 ColumnValue::null(),
753 ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 42]),
754 ]);
755 assert!(row.size_bytes() > 0);
756 }
757
758 #[test]
759 fn test_database_event_batch() {
760 let batch = DatabaseEventBatch::new()
761 .with_source_id("test-source")
762 .with_batch_seq(42);
763
764 assert_eq!(batch.source_id, Some("test-source".to_string()));
765 assert_eq!(batch.batch_seq, 42);
766 assert!(!batch.is_control_batch());
767 }
768
769 #[test]
770 fn test_operation_type_control_directives() {
771 assert_eq!(OperationType::SequenceSync.to_string(), "SEQUENCE_SYNC");
773 assert_eq!(OperationType::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
774 assert_eq!(OperationType::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
775
776 assert_eq!(OperationType::from_byte(30), Some(OperationType::SequenceSync));
778 assert_eq!(OperationType::from_byte(31), Some(OperationType::DisableForeignKeys));
779 assert_eq!(OperationType::from_byte(32), Some(OperationType::EnableForeignKeys));
780
781 assert!(OperationType::SequenceSync.is_control_directive());
783 assert!(OperationType::DisableForeignKeys.is_control_directive());
784 assert!(OperationType::EnableForeignKeys.is_control_directive());
785 assert!(!OperationType::Insert.is_control_directive());
786 assert!(!OperationType::SnapshotRow.is_control_directive());
787 }
788
789 #[test]
790 fn test_sequence_value() {
791 let seq = SequenceValue::new("public", "users_id_seq", 100);
792 assert_eq!(seq.schema, "public");
793 assert_eq!(seq.name, "users_id_seq");
794 assert_eq!(seq.last_value, 100);
795 assert!(seq.is_called);
796 assert_eq!(seq.increment_by, 1);
797 assert_eq!(seq.qualified_name(), "public.users_id_seq");
798 }
799
800 #[test]
801 fn test_control_directive_display() {
802 assert_eq!(ControlDirective::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
803 assert_eq!(ControlDirective::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
804 assert_eq!(ControlDirective::SyncSequences.to_string(), "SYNC_SEQUENCES");
805 }
806
807 #[test]
808 fn test_control_batch() {
809 let batch = DatabaseEventBatch::control(ControlDirective::DisableForeignKeys)
810 .with_source_id("backfill-001")
811 .with_batch_seq(1);
812
813 assert!(batch.is_control_batch());
814 assert_eq!(batch.control_directive, Some(ControlDirective::DisableForeignKeys));
815 assert!(batch.events.is_empty());
816 assert!(batch.sequence_values.is_none());
817 }
818
819 #[test]
820 fn test_sequence_sync_batch() {
821 let sequences = vec![
822 SequenceValue::new("public", "users_id_seq", 100),
823 SequenceValue::new("public", "orders_id_seq", 500),
824 ];
825
826 let batch = DatabaseEventBatch::sequence_sync(sequences)
827 .with_source_id("backfill-001")
828 .with_batch_seq(99);
829
830 assert!(batch.is_control_batch());
831 assert_eq!(batch.control_directive, Some(ControlDirective::SyncSequences));
832 assert!(batch.events.is_empty());
833
834 let seq_values = batch.sequence_values.as_ref().unwrap();
835 assert_eq!(seq_values.len(), 2);
836 assert_eq!(seq_values[0].qualified_name(), "public.users_id_seq");
837 assert_eq!(seq_values[1].last_value, 500);
838 }
839}