Skip to main content

scry_protocol/database_event/
types.rs

1//! Database event types matching the FlatBuffers schema.
2
3use serde::{Deserialize, Serialize};
4
5/// Type tag for fast dispatch without parsing OID.
6/// Maps to PostgreSQL type categories.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
8#[repr(u8)]
9pub enum TypeTag {
10    // Special
11    Null = 0,
12
13    // Basic types
14    Bool = 1,
15    Int16 = 2,
16    Int32 = 3,
17    Int64 = 4,
18    Float32 = 5,
19    Float64 = 6,
20    Text = 7,
21    Bytea = 8,
22
23    // JSON types
24    Json = 9,
25    Jsonb = 10,
26
27    // Other common types
28    Uuid = 11,
29    Timestamp = 12,
30    TimestampTz = 13,
31    Date = 14,
32    Time = 15,
33    TimeTz = 16,
34    Interval = 17,
35    Numeric = 18,
36
37    // Bit types
38    Bit = 20,
39    Varbit = 21,
40
41    // Geometric types
42    Point = 30,
43    Line = 31,
44    Lseg = 32,
45    Box = 33,
46    Path = 34,
47    Polygon = 35,
48    Circle = 36,
49
50    // Network types
51    Inet = 40,
52    Cidr = 41,
53    MacAddr = 42,
54    MacAddr8 = 43,
55
56    // Range types
57    Int4Range = 50,
58    Int8Range = 51,
59    NumRange = 52,
60    TsRange = 53,
61    TsTzRange = 54,
62    DateRange = 55,
63
64    // Money
65    Money = 60,
66
67    // XML
68    Xml = 61,
69
70    // Arrays (element type encoded in type_oid)
71    Array = 100,
72
73    // Special marker for unchanged TOAST values in UPDATE operations.
74    // When a column has a large TOAST value that wasn't modified, PostgreSQL
75    // sends this marker instead of the value. The target should exclude these
76    // columns from the UPDATE SET clause.
77    ToastUnchanged = 126,
78
79    // Extension point for custom/unknown types
80    Custom = 127,
81}
82
83impl TypeTag {
84    /// Convert from byte value.
85    pub fn from_byte(b: u8) -> Option<Self> {
86        match b {
87            0 => Some(Self::Null),
88            1 => Some(Self::Bool),
89            2 => Some(Self::Int16),
90            3 => Some(Self::Int32),
91            4 => Some(Self::Int64),
92            5 => Some(Self::Float32),
93            6 => Some(Self::Float64),
94            7 => Some(Self::Text),
95            8 => Some(Self::Bytea),
96            9 => Some(Self::Json),
97            10 => Some(Self::Jsonb),
98            11 => Some(Self::Uuid),
99            12 => Some(Self::Timestamp),
100            13 => Some(Self::TimestampTz),
101            14 => Some(Self::Date),
102            15 => Some(Self::Time),
103            16 => Some(Self::TimeTz),
104            17 => Some(Self::Interval),
105            18 => Some(Self::Numeric),
106            20 => Some(Self::Bit),
107            21 => Some(Self::Varbit),
108            30 => Some(Self::Point),
109            31 => Some(Self::Line),
110            32 => Some(Self::Lseg),
111            33 => Some(Self::Box),
112            34 => Some(Self::Path),
113            35 => Some(Self::Polygon),
114            36 => Some(Self::Circle),
115            40 => Some(Self::Inet),
116            41 => Some(Self::Cidr),
117            42 => Some(Self::MacAddr),
118            43 => Some(Self::MacAddr8),
119            50 => Some(Self::Int4Range),
120            51 => Some(Self::Int8Range),
121            52 => Some(Self::NumRange),
122            53 => Some(Self::TsRange),
123            54 => Some(Self::TsTzRange),
124            55 => Some(Self::DateRange),
125            60 => Some(Self::Money),
126            61 => Some(Self::Xml),
127            100 => Some(Self::Array),
128            126 => Some(Self::ToastUnchanged),
129            127 => Some(Self::Custom),
130            _ => None,
131        }
132    }
133}
134
135/// Database operation type.
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[repr(u8)]
138pub enum OperationType {
139    // DML operations
140    Insert = 0,
141    Update = 1,
142    Delete = 2,
143    Truncate = 3,
144
145    // Transaction boundaries
146    Begin = 10,
147    Commit = 11,
148    Rollback = 12,
149
150    // Snapshot markers (for COPY rows)
151    SnapshotRow = 20,
152    SnapshotBegin = 21,
153    SnapshotEnd = 22,
154
155    // Control directives (for coordinating producer/receiver behavior)
156    SequenceSync = 30,
157    DisableForeignKeys = 31,
158    EnableForeignKeys = 32,
159
160    // DDL operations
161    Ddl = 33,
162
163    // DDL phase completed (control event from scry-backfill)
164    DdlComplete = 34,
165
166    // Backfill phase completed (control event from scry-backfill)
167    // Contains table statistics for verification
168    BackfillComplete = 35,
169
170    // Backfill phase starting (control event from scry-backfill)
171    // Contains estimated table/row counts for progress tracking
172    BackfillStart = 36,
173}
174
175impl OperationType {
176    /// Convert from byte value.
177    pub fn from_byte(b: u8) -> Option<Self> {
178        match b {
179            0 => Some(Self::Insert),
180            1 => Some(Self::Update),
181            2 => Some(Self::Delete),
182            3 => Some(Self::Truncate),
183            10 => Some(Self::Begin),
184            11 => Some(Self::Commit),
185            12 => Some(Self::Rollback),
186            20 => Some(Self::SnapshotRow),
187            21 => Some(Self::SnapshotBegin),
188            22 => Some(Self::SnapshotEnd),
189            30 => Some(Self::SequenceSync),
190            31 => Some(Self::DisableForeignKeys),
191            32 => Some(Self::EnableForeignKeys),
192            33 => Some(Self::Ddl),
193            34 => Some(Self::DdlComplete),
194            35 => Some(Self::BackfillComplete),
195            36 => Some(Self::BackfillStart),
196            _ => None,
197        }
198    }
199
200    /// Check if this is a control directive (not a data operation).
201    pub fn is_control_directive(&self) -> bool {
202        matches!(
203            self,
204            Self::SequenceSync | Self::DisableForeignKeys | Self::EnableForeignKeys
205                | Self::BackfillComplete | Self::BackfillStart
206        )
207    }
208
209    /// Check if this is a DDL operation.
210    pub fn is_ddl(&self) -> bool {
211        matches!(self, Self::Ddl)
212    }
213}
214
215impl std::fmt::Display for OperationType {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        match self {
218            Self::Insert => write!(f, "INSERT"),
219            Self::Update => write!(f, "UPDATE"),
220            Self::Delete => write!(f, "DELETE"),
221            Self::Truncate => write!(f, "TRUNCATE"),
222            Self::Begin => write!(f, "BEGIN"),
223            Self::Commit => write!(f, "COMMIT"),
224            Self::Rollback => write!(f, "ROLLBACK"),
225            Self::SnapshotRow => write!(f, "SNAPSHOT_ROW"),
226            Self::SnapshotBegin => write!(f, "SNAPSHOT_BEGIN"),
227            Self::SnapshotEnd => write!(f, "SNAPSHOT_END"),
228            Self::SequenceSync => write!(f, "SEQUENCE_SYNC"),
229            Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
230            Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
231            Self::Ddl => write!(f, "DDL"),
232            Self::DdlComplete => write!(f, "DDL_COMPLETE"),
233            Self::BackfillComplete => write!(f, "BACKFILL_COMPLETE"),
234            Self::BackfillStart => write!(f, "BACKFILL_START"),
235        }
236    }
237}
238
239/// PostgreSQL replica identity setting.
240#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
241#[repr(u8)]
242pub enum ReplicaIdentity {
243    #[default]
244    Default = 0, // 'd' - Use primary key
245    Nothing = 1, // 'n' - No old row data
246    Full = 2,    // 'f' - Full old row
247    Index = 3,   // 'i' - Use specific index
248}
249
250impl ReplicaIdentity {
251    /// Convert from PostgreSQL replica identity character.
252    pub fn from_pg_char(c: u8) -> Self {
253        match c {
254            b'd' => Self::Default,
255            b'n' => Self::Nothing,
256            b'f' => Self::Full,
257            b'i' => Self::Index,
258            _ => Self::Default,
259        }
260    }
261
262    /// Convert to PostgreSQL replica identity character.
263    pub fn to_pg_char(self) -> u8 {
264        match self {
265            Self::Default => b'd',
266            Self::Nothing => b'n',
267            Self::Full => b'f',
268            Self::Index => b'i',
269        }
270    }
271}
272
273/// A PostgreSQL sequence value for synchronization.
274///
275/// Used to transmit current sequence state from source to target
276/// so that sequences can be synchronized after snapshot completion.
277#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
278pub struct SequenceValue {
279    /// Schema name containing the sequence.
280    pub schema: String,
281
282    /// Sequence name.
283    pub name: String,
284
285    /// Current last_value from pg_sequences.
286    pub last_value: i64,
287
288    /// Whether setval's is_called parameter should be true.
289    /// If true: next nextval() returns last_value + increment_by
290    /// If false: next nextval() returns last_value
291    pub is_called: bool,
292
293    /// Increment value for this sequence.
294    pub increment_by: i64,
295
296    /// Minimum value for this sequence.
297    pub min_value: i64,
298
299    /// Maximum value for this sequence.
300    pub max_value: i64,
301}
302
303impl SequenceValue {
304    /// Create a new sequence value.
305    pub fn new(
306        schema: impl Into<String>,
307        name: impl Into<String>,
308        last_value: i64,
309    ) -> Self {
310        Self {
311            schema: schema.into(),
312            name: name.into(),
313            last_value,
314            is_called: true,
315            increment_by: 1,
316            min_value: 1,
317            max_value: i64::MAX,
318        }
319    }
320
321    /// Get the fully qualified sequence name.
322    pub fn qualified_name(&self) -> String {
323        format!("{}.{}", self.schema, self.name)
324    }
325}
326
327/// Control directive for coordinating producer/receiver behavior.
328///
329/// Control directives are sent as special batches to signal the receiver
330/// to perform administrative operations like disabling/enabling constraints.
331#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
332pub enum ControlDirective {
333    /// Disable foreign key constraints on the target.
334    /// The receiver should:
335    /// 1. Query all FK constraints
336    /// 2. Store definitions in _scry_admin.foreign_keys
337    /// 3. Drop all FK constraints
338    DisableForeignKeys,
339
340    /// Enable foreign key constraints on the target.
341    /// The receiver should:
342    /// 1. Read FK definitions from _scry_admin.foreign_keys
343    /// 2. Recreate all FK constraints
344    /// 3. Clean up the admin table
345    EnableForeignKeys,
346
347    /// Synchronize sequence values.
348    /// The batch will contain sequence_values with the current state.
349    SyncSequences,
350}
351
352impl std::fmt::Display for ControlDirective {
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354        match self {
355            Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
356            Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
357            Self::SyncSequences => write!(f, "SYNC_SEQUENCES"),
358        }
359    }
360}
361
362/// A single column value.
363/// Uses raw PostgreSQL binary format bytes for zero-copy efficiency.
364#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
365pub struct ColumnValue {
366    /// Type tag for fast dispatch.
367    pub type_tag: TypeTag,
368
369    /// PostgreSQL type OID (for arrays, custom types, and disambiguation).
370    pub type_oid: u32,
371
372    /// Raw PostgreSQL binary format bytes.
373    /// None represents NULL.
374    pub data: Option<Vec<u8>>,
375}
376
377impl ColumnValue {
378    /// Create a NULL column value.
379    pub fn null() -> Self {
380        Self {
381            type_tag: TypeTag::Null,
382            type_oid: 0,
383            data: None,
384        }
385    }
386
387    /// Create a column value from raw PostgreSQL binary data.
388    pub fn from_pg_binary(type_tag: TypeTag, type_oid: u32, data: Vec<u8>) -> Self {
389        Self {
390            type_tag,
391            type_oid,
392            data: Some(data),
393        }
394    }
395
396    /// Create a marker for an unchanged TOAST value.
397    /// Used in UPDATE operations when a large column value was not modified.
398    /// The type_oid is preserved so the target knows the column's type.
399    pub fn unchanged(type_oid: u32) -> Self {
400        Self {
401            type_tag: TypeTag::ToastUnchanged,
402            type_oid,
403            data: None,
404        }
405    }
406
407    /// Check if this value is NULL.
408    pub fn is_null(&self) -> bool {
409        self.data.is_none() && self.type_tag == TypeTag::Null
410    }
411
412    /// Check if this value is an unchanged TOAST marker.
413    pub fn is_unchanged(&self) -> bool {
414        self.type_tag == TypeTag::ToastUnchanged
415    }
416
417    /// Get the raw data bytes.
418    pub fn as_bytes(&self) -> Option<&[u8]> {
419        self.data.as_deref()
420    }
421
422    /// Estimate the size of this value in bytes.
423    pub fn size_bytes(&self) -> usize {
424        // 1 (tag) + 4 (oid) + data length
425        5 + self.data.as_ref().map_or(0, |d| d.len())
426    }
427}
428
429/// A row of column values.
430#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
431pub struct Row {
432    pub values: Vec<ColumnValue>,
433}
434
435impl Row {
436    /// Create a new row from values.
437    pub fn new(values: Vec<ColumnValue>) -> Self {
438        Self { values }
439    }
440
441    /// Create an empty row.
442    pub fn empty() -> Self {
443        Self { values: Vec::new() }
444    }
445
446    /// Get a value by index.
447    pub fn get(&self, index: usize) -> Option<&ColumnValue> {
448        self.values.get(index)
449    }
450
451    /// Estimate the size of this row in bytes.
452    pub fn size_bytes(&self) -> usize {
453        self.values.iter().map(|v| v.size_bytes()).sum()
454    }
455}
456
457/// Column metadata.
458#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
459pub struct ColumnMeta {
460    /// Column name.
461    pub name: String,
462
463    /// PostgreSQL type OID.
464    pub type_oid: u32,
465
466    /// Type modifier (e.g., varchar length, numeric precision).
467    pub type_modifier: i32,
468
469    /// Whether this column is part of the key.
470    pub is_key: bool,
471}
472
473/// Relation (table) metadata.
474#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
475pub struct RelationMeta {
476    /// Relation ID (PostgreSQL OID).
477    pub rel_id: u32,
478
479    /// Schema name.
480    pub schema: String,
481
482    /// Table name.
483    pub table: String,
484
485    /// Column definitions.
486    pub columns: Vec<ColumnMeta>,
487
488    /// Replica identity setting.
489    pub replica_identity: ReplicaIdentity,
490}
491
492impl RelationMeta {
493    /// Get the fully qualified table name.
494    pub fn qualified_name(&self) -> String {
495        format!("{}.{}", self.schema, self.table)
496    }
497
498    /// Get column names in order.
499    pub fn column_names(&self) -> Vec<&str> {
500        self.columns.iter().map(|c| c.name.as_str()).collect()
501    }
502}
503
504/// Metadata about a backfill operation, sent with `BackfillStart` events.
505///
506/// Contains estimated table and row counts for progress tracking.
507#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
508pub struct BackfillMetadata {
509    /// Number of tables being backfilled.
510    pub table_count: u32,
511
512    /// Estimated total rows across all tables (from pg_class.reltuples).
513    pub estimated_total_rows: u64,
514}
515
516/// A single database event.
517#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
518pub struct DatabaseEvent {
519    /// Unique event identifier (UUID v4).
520    pub event_id: Option<String>,
521
522    /// Event timestamp (Unix timestamp in microseconds).
523    pub timestamp_us: u64,
524
525    /// Operation type.
526    pub operation: OperationType,
527
528    /// Schema name.
529    pub schema: String,
530
531    /// Table name.
532    pub table: String,
533
534    /// Replication position (PostgreSQL LSN as u64).
535    pub position: u64,
536
537    /// Transaction ID.
538    pub transaction_id: u64,
539
540    /// New row data (for INSERT, UPDATE, SnapshotRow).
541    pub new_row: Option<Row>,
542
543    /// Old row data (for UPDATE with REPLICA IDENTITY FULL, DELETE).
544    pub old_row: Option<Row>,
545
546    /// Column names in order.
547    pub columns: Vec<String>,
548
549    /// Relation metadata (sent once per table per stream).
550    pub relation_meta: Option<RelationMeta>,
551
552    /// DDL SQL statement (when operation == Ddl).
553    /// Contains the full CREATE/ALTER/DROP statement.
554    #[serde(default, skip_serializing_if = "Option::is_none")]
555    pub ddl_sql: Option<String>,
556
557    /// DDL object type for filtering/logging.
558    /// e.g., "extension", "table", "index", "constraint", "function", "trigger"
559    #[serde(default, skip_serializing_if = "Option::is_none")]
560    pub ddl_object_type: Option<String>,
561}
562
563impl DatabaseEvent {
564    /// Get the fully qualified table name.
565    pub fn qualified_name(&self) -> String {
566        format!("{}.{}", self.schema, self.table)
567    }
568
569    /// Estimate the size of this event in bytes.
570    pub fn size_bytes(&self) -> usize {
571        let mut size = 64; // Base overhead
572        size += self.schema.len();
573        size += self.table.len();
574        if let Some(ref row) = self.new_row {
575            size += row.size_bytes();
576        }
577        if let Some(ref row) = self.old_row {
578            size += row.size_bytes();
579        }
580        size += self.columns.iter().map(|c| c.len()).sum::<usize>();
581        size
582    }
583}
584
585/// Batch of events for efficient transport.
586#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
587pub struct DatabaseEventBatch {
588    /// Events in this batch.
589    pub events: Vec<DatabaseEvent>,
590
591    /// Source identifier (proxy/connector ID).
592    pub source_id: Option<String>,
593
594    /// Batch sequence number (for ordering/deduplication).
595    pub batch_seq: u64,
596
597    /// Cached relation metadata for this batch.
598    pub relations: Vec<RelationMeta>,
599
600    /// Control directive for this batch (if a control batch).
601    /// When present, this batch signals the receiver to perform
602    /// an administrative operation rather than applying data changes.
603    #[serde(default, skip_serializing_if = "Option::is_none")]
604    pub control_directive: Option<ControlDirective>,
605
606    /// Sequence values for synchronization.
607    /// Present when control_directive is SyncSequences.
608    #[serde(default, skip_serializing_if = "Option::is_none")]
609    pub sequence_values: Option<Vec<SequenceValue>>,
610
611    /// Backfill metadata for progress tracking.
612    /// Present when the batch contains a BackfillStart event.
613    #[serde(default, skip_serializing_if = "Option::is_none")]
614    pub backfill_metadata: Option<BackfillMetadata>,
615}
616
617impl DatabaseEventBatch {
618    /// Create a new empty batch.
619    pub fn new() -> Self {
620        Self {
621            events: Vec::new(),
622            source_id: None,
623            batch_seq: 0,
624            relations: Vec::new(),
625            control_directive: None,
626            sequence_values: None,
627            backfill_metadata: None,
628        }
629    }
630
631    /// Create a batch with events.
632    pub fn with_events(events: Vec<DatabaseEvent>) -> Self {
633        Self {
634            events,
635            source_id: None,
636            batch_seq: 0,
637            relations: Vec::new(),
638            control_directive: None,
639            sequence_values: None,
640            backfill_metadata: None,
641        }
642    }
643
644    /// Create a control batch that signals an administrative operation.
645    pub fn control(directive: ControlDirective) -> Self {
646        Self {
647            events: Vec::new(),
648            source_id: None,
649            batch_seq: 0,
650            relations: Vec::new(),
651            control_directive: Some(directive),
652            sequence_values: None,
653            backfill_metadata: None,
654        }
655    }
656
657    /// Create a sequence sync control batch with sequence values.
658    pub fn sequence_sync(sequences: Vec<SequenceValue>) -> Self {
659        Self {
660            events: Vec::new(),
661            source_id: None,
662            batch_seq: 0,
663            relations: Vec::new(),
664            control_directive: Some(ControlDirective::SyncSequences),
665            sequence_values: Some(sequences),
666            backfill_metadata: None,
667        }
668    }
669
670    /// Set the source ID.
671    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
672        self.source_id = Some(source_id.into());
673        self
674    }
675
676    /// Set the batch sequence number.
677    pub fn with_batch_seq(mut self, seq: u64) -> Self {
678        self.batch_seq = seq;
679        self
680    }
681
682    /// Set the control directive.
683    pub fn with_control_directive(mut self, directive: ControlDirective) -> Self {
684        self.control_directive = Some(directive);
685        self
686    }
687
688    /// Set sequence values.
689    pub fn with_sequence_values(mut self, sequences: Vec<SequenceValue>) -> Self {
690        self.sequence_values = Some(sequences);
691        self
692    }
693
694    /// Set backfill metadata for progress tracking.
695    pub fn with_backfill_metadata(mut self, metadata: BackfillMetadata) -> Self {
696        self.backfill_metadata = Some(metadata);
697        self
698    }
699
700    /// Check if this is a control batch (no data events, just a directive).
701    pub fn is_control_batch(&self) -> bool {
702        self.control_directive.is_some()
703    }
704
705    /// Estimate the size of this batch in bytes.
706    pub fn size_bytes(&self) -> usize {
707        let mut size = 32; // Base overhead
708        size += self.events.iter().map(|e| e.size_bytes()).sum::<usize>();
709        size
710    }
711}
712
713impl Default for DatabaseEventBatch {
714    fn default() -> Self {
715        Self::new()
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_type_tag_roundtrip() {
725        for tag in [
726            TypeTag::Null,
727            TypeTag::Bool,
728            TypeTag::Int32,
729            TypeTag::Text,
730            TypeTag::Uuid,
731            TypeTag::Array,
732            TypeTag::ToastUnchanged,
733            TypeTag::Custom,
734        ] {
735            let byte = tag as u8;
736            let recovered = TypeTag::from_byte(byte).unwrap();
737            assert_eq!(tag, recovered);
738        }
739    }
740
741    #[test]
742    fn test_operation_type_display() {
743        assert_eq!(OperationType::Insert.to_string(), "INSERT");
744        assert_eq!(OperationType::Update.to_string(), "UPDATE");
745        assert_eq!(OperationType::Delete.to_string(), "DELETE");
746        assert_eq!(OperationType::Begin.to_string(), "BEGIN");
747        assert_eq!(OperationType::Commit.to_string(), "COMMIT");
748        assert_eq!(OperationType::SnapshotRow.to_string(), "SNAPSHOT_ROW");
749    }
750
751    #[test]
752    fn test_replica_identity_pg_char() {
753        assert_eq!(ReplicaIdentity::from_pg_char(b'd'), ReplicaIdentity::Default);
754        assert_eq!(ReplicaIdentity::from_pg_char(b'f'), ReplicaIdentity::Full);
755        assert_eq!(ReplicaIdentity::Full.to_pg_char(), b'f');
756    }
757
758    #[test]
759    fn test_column_value_null() {
760        let null = ColumnValue::null();
761        assert!(null.is_null());
762        assert!(!null.is_unchanged());
763        assert_eq!(null.as_bytes(), None);
764    }
765
766    #[test]
767    fn test_column_value_unchanged() {
768        let unchanged = ColumnValue::unchanged(25); // TEXT type OID
769        assert!(!unchanged.is_null());
770        assert!(unchanged.is_unchanged());
771        assert_eq!(unchanged.type_oid, 25);
772        assert_eq!(unchanged.as_bytes(), None);
773    }
774
775    #[test]
776    fn test_column_value_with_data() {
777        let data = vec![0x00, 0x01, 0x02, 0x03];
778        let value = ColumnValue::from_pg_binary(TypeTag::Int32, 23, data.clone());
779        assert!(!value.is_null());
780        assert_eq!(value.as_bytes(), Some(data.as_slice()));
781    }
782
783    #[test]
784    fn test_row_size_bytes() {
785        let row = Row::new(vec![
786            ColumnValue::null(),
787            ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 42]),
788        ]);
789        assert!(row.size_bytes() > 0);
790    }
791
792    #[test]
793    fn test_database_event_batch() {
794        let batch = DatabaseEventBatch::new()
795            .with_source_id("test-source")
796            .with_batch_seq(42);
797
798        assert_eq!(batch.source_id, Some("test-source".to_string()));
799        assert_eq!(batch.batch_seq, 42);
800        assert!(!batch.is_control_batch());
801    }
802
803    #[test]
804    fn test_operation_type_control_directives() {
805        // Test control directive operation types
806        assert_eq!(OperationType::SequenceSync.to_string(), "SEQUENCE_SYNC");
807        assert_eq!(OperationType::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
808        assert_eq!(OperationType::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
809
810        // Test from_byte roundtrip
811        assert_eq!(OperationType::from_byte(30), Some(OperationType::SequenceSync));
812        assert_eq!(OperationType::from_byte(31), Some(OperationType::DisableForeignKeys));
813        assert_eq!(OperationType::from_byte(32), Some(OperationType::EnableForeignKeys));
814
815        // Test is_control_directive
816        assert!(OperationType::SequenceSync.is_control_directive());
817        assert!(OperationType::DisableForeignKeys.is_control_directive());
818        assert!(OperationType::EnableForeignKeys.is_control_directive());
819        assert!(!OperationType::Insert.is_control_directive());
820        assert!(!OperationType::SnapshotRow.is_control_directive());
821    }
822
823    #[test]
824    fn test_sequence_value() {
825        let seq = SequenceValue::new("public", "users_id_seq", 100);
826        assert_eq!(seq.schema, "public");
827        assert_eq!(seq.name, "users_id_seq");
828        assert_eq!(seq.last_value, 100);
829        assert!(seq.is_called);
830        assert_eq!(seq.increment_by, 1);
831        assert_eq!(seq.qualified_name(), "public.users_id_seq");
832    }
833
834    #[test]
835    fn test_control_directive_display() {
836        assert_eq!(ControlDirective::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
837        assert_eq!(ControlDirective::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
838        assert_eq!(ControlDirective::SyncSequences.to_string(), "SYNC_SEQUENCES");
839    }
840
841    #[test]
842    fn test_control_batch() {
843        let batch = DatabaseEventBatch::control(ControlDirective::DisableForeignKeys)
844            .with_source_id("backfill-001")
845            .with_batch_seq(1);
846
847        assert!(batch.is_control_batch());
848        assert_eq!(batch.control_directive, Some(ControlDirective::DisableForeignKeys));
849        assert!(batch.events.is_empty());
850        assert!(batch.sequence_values.is_none());
851    }
852
853    #[test]
854    fn test_sequence_sync_batch() {
855        let sequences = vec![
856            SequenceValue::new("public", "users_id_seq", 100),
857            SequenceValue::new("public", "orders_id_seq", 500),
858        ];
859
860        let batch = DatabaseEventBatch::sequence_sync(sequences)
861            .with_source_id("backfill-001")
862            .with_batch_seq(99);
863
864        assert!(batch.is_control_batch());
865        assert_eq!(batch.control_directive, Some(ControlDirective::SyncSequences));
866        assert!(batch.events.is_empty());
867
868        let seq_values = batch.sequence_values.as_ref().unwrap();
869        assert_eq!(seq_values.len(), 2);
870        assert_eq!(seq_values[0].qualified_name(), "public.users_id_seq");
871        assert_eq!(seq_values[1].last_value, 500);
872    }
873}