Skip to main content

scry_protocol/database_event/
types.rs

1//! Database event types matching the FlatBuffers schema.
2
3use serde::{Deserialize, Serialize};
4
5/// Type tag for fast dispatch without parsing OID.
6/// Maps to PostgreSQL type categories.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
8#[repr(u8)]
9pub enum TypeTag {
10    // Special
11    Null = 0,
12
13    // Basic types
14    Bool = 1,
15    Int16 = 2,
16    Int32 = 3,
17    Int64 = 4,
18    Float32 = 5,
19    Float64 = 6,
20    Text = 7,
21    Bytea = 8,
22
23    // JSON types
24    Json = 9,
25    Jsonb = 10,
26
27    // Other common types
28    Uuid = 11,
29    Timestamp = 12,
30    TimestampTz = 13,
31    Date = 14,
32    Time = 15,
33    TimeTz = 16,
34    Interval = 17,
35    Numeric = 18,
36
37    // Bit types
38    Bit = 20,
39    Varbit = 21,
40
41    // Geometric types
42    Point = 30,
43    Line = 31,
44    Lseg = 32,
45    Box = 33,
46    Path = 34,
47    Polygon = 35,
48    Circle = 36,
49
50    // Network types
51    Inet = 40,
52    Cidr = 41,
53    MacAddr = 42,
54    MacAddr8 = 43,
55
56    // Range types
57    Int4Range = 50,
58    Int8Range = 51,
59    NumRange = 52,
60    TsRange = 53,
61    TsTzRange = 54,
62    DateRange = 55,
63
64    // Money
65    Money = 60,
66
67    // XML
68    Xml = 61,
69
70    // Arrays (element type encoded in type_oid)
71    Array = 100,
72
73    // Special marker for unchanged TOAST values in UPDATE operations.
74    // When a column has a large TOAST value that wasn't modified, PostgreSQL
75    // sends this marker instead of the value. The target should exclude these
76    // columns from the UPDATE SET clause.
77    ToastUnchanged = 126,
78
79    // Extension point for custom/unknown types
80    Custom = 127,
81}
82
83impl TypeTag {
84    /// Convert from byte value.
85    pub fn from_byte(b: u8) -> Option<Self> {
86        match b {
87            0 => Some(Self::Null),
88            1 => Some(Self::Bool),
89            2 => Some(Self::Int16),
90            3 => Some(Self::Int32),
91            4 => Some(Self::Int64),
92            5 => Some(Self::Float32),
93            6 => Some(Self::Float64),
94            7 => Some(Self::Text),
95            8 => Some(Self::Bytea),
96            9 => Some(Self::Json),
97            10 => Some(Self::Jsonb),
98            11 => Some(Self::Uuid),
99            12 => Some(Self::Timestamp),
100            13 => Some(Self::TimestampTz),
101            14 => Some(Self::Date),
102            15 => Some(Self::Time),
103            16 => Some(Self::TimeTz),
104            17 => Some(Self::Interval),
105            18 => Some(Self::Numeric),
106            20 => Some(Self::Bit),
107            21 => Some(Self::Varbit),
108            30 => Some(Self::Point),
109            31 => Some(Self::Line),
110            32 => Some(Self::Lseg),
111            33 => Some(Self::Box),
112            34 => Some(Self::Path),
113            35 => Some(Self::Polygon),
114            36 => Some(Self::Circle),
115            40 => Some(Self::Inet),
116            41 => Some(Self::Cidr),
117            42 => Some(Self::MacAddr),
118            43 => Some(Self::MacAddr8),
119            50 => Some(Self::Int4Range),
120            51 => Some(Self::Int8Range),
121            52 => Some(Self::NumRange),
122            53 => Some(Self::TsRange),
123            54 => Some(Self::TsTzRange),
124            55 => Some(Self::DateRange),
125            60 => Some(Self::Money),
126            61 => Some(Self::Xml),
127            100 => Some(Self::Array),
128            126 => Some(Self::ToastUnchanged),
129            127 => Some(Self::Custom),
130            _ => None,
131        }
132    }
133}
134
135/// Database operation type.
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[repr(u8)]
138pub enum OperationType {
139    // DML operations
140    Insert = 0,
141    Update = 1,
142    Delete = 2,
143    Truncate = 3,
144
145    // Transaction boundaries
146    Begin = 10,
147    Commit = 11,
148    Rollback = 12,
149
150    // Snapshot markers (for COPY rows)
151    SnapshotRow = 20,
152    SnapshotBegin = 21,
153    SnapshotEnd = 22,
154
155    // Control directives (for coordinating producer/receiver behavior)
156    SequenceSync = 30,
157    DisableForeignKeys = 31,
158    EnableForeignKeys = 32,
159
160    // DDL operations
161    Ddl = 33,
162
163    // DDL phase completed (control event from scry-backfill)
164    DdlComplete = 34,
165
166    // Backfill phase completed (control event from scry-backfill)
167    // Contains table statistics for verification
168    BackfillComplete = 35,
169}
170
171impl OperationType {
172    /// Convert from byte value.
173    pub fn from_byte(b: u8) -> Option<Self> {
174        match b {
175            0 => Some(Self::Insert),
176            1 => Some(Self::Update),
177            2 => Some(Self::Delete),
178            3 => Some(Self::Truncate),
179            10 => Some(Self::Begin),
180            11 => Some(Self::Commit),
181            12 => Some(Self::Rollback),
182            20 => Some(Self::SnapshotRow),
183            21 => Some(Self::SnapshotBegin),
184            22 => Some(Self::SnapshotEnd),
185            30 => Some(Self::SequenceSync),
186            31 => Some(Self::DisableForeignKeys),
187            32 => Some(Self::EnableForeignKeys),
188            33 => Some(Self::Ddl),
189            34 => Some(Self::DdlComplete),
190            35 => Some(Self::BackfillComplete),
191            _ => None,
192        }
193    }
194
195    /// Check if this is a control directive (not a data operation).
196    pub fn is_control_directive(&self) -> bool {
197        matches!(
198            self,
199            Self::SequenceSync | Self::DisableForeignKeys | Self::EnableForeignKeys | Self::BackfillComplete
200        )
201    }
202
203    /// Check if this is a DDL operation.
204    pub fn is_ddl(&self) -> bool {
205        matches!(self, Self::Ddl)
206    }
207}
208
209impl std::fmt::Display for OperationType {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        match self {
212            Self::Insert => write!(f, "INSERT"),
213            Self::Update => write!(f, "UPDATE"),
214            Self::Delete => write!(f, "DELETE"),
215            Self::Truncate => write!(f, "TRUNCATE"),
216            Self::Begin => write!(f, "BEGIN"),
217            Self::Commit => write!(f, "COMMIT"),
218            Self::Rollback => write!(f, "ROLLBACK"),
219            Self::SnapshotRow => write!(f, "SNAPSHOT_ROW"),
220            Self::SnapshotBegin => write!(f, "SNAPSHOT_BEGIN"),
221            Self::SnapshotEnd => write!(f, "SNAPSHOT_END"),
222            Self::SequenceSync => write!(f, "SEQUENCE_SYNC"),
223            Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
224            Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
225            Self::Ddl => write!(f, "DDL"),
226            Self::DdlComplete => write!(f, "DDL_COMPLETE"),
227            Self::BackfillComplete => write!(f, "BACKFILL_COMPLETE"),
228        }
229    }
230}
231
232/// PostgreSQL replica identity setting.
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
234#[repr(u8)]
235pub enum ReplicaIdentity {
236    #[default]
237    Default = 0, // 'd' - Use primary key
238    Nothing = 1, // 'n' - No old row data
239    Full = 2,    // 'f' - Full old row
240    Index = 3,   // 'i' - Use specific index
241}
242
243impl ReplicaIdentity {
244    /// Convert from PostgreSQL replica identity character.
245    pub fn from_pg_char(c: u8) -> Self {
246        match c {
247            b'd' => Self::Default,
248            b'n' => Self::Nothing,
249            b'f' => Self::Full,
250            b'i' => Self::Index,
251            _ => Self::Default,
252        }
253    }
254
255    /// Convert to PostgreSQL replica identity character.
256    pub fn to_pg_char(self) -> u8 {
257        match self {
258            Self::Default => b'd',
259            Self::Nothing => b'n',
260            Self::Full => b'f',
261            Self::Index => b'i',
262        }
263    }
264}
265
266/// A PostgreSQL sequence value for synchronization.
267///
268/// Used to transmit current sequence state from source to target
269/// so that sequences can be synchronized after snapshot completion.
270#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
271pub struct SequenceValue {
272    /// Schema name containing the sequence.
273    pub schema: String,
274
275    /// Sequence name.
276    pub name: String,
277
278    /// Current last_value from pg_sequences.
279    pub last_value: i64,
280
281    /// Whether setval's is_called parameter should be true.
282    /// If true: next nextval() returns last_value + increment_by
283    /// If false: next nextval() returns last_value
284    pub is_called: bool,
285
286    /// Increment value for this sequence.
287    pub increment_by: i64,
288
289    /// Minimum value for this sequence.
290    pub min_value: i64,
291
292    /// Maximum value for this sequence.
293    pub max_value: i64,
294}
295
296impl SequenceValue {
297    /// Create a new sequence value.
298    pub fn new(
299        schema: impl Into<String>,
300        name: impl Into<String>,
301        last_value: i64,
302    ) -> Self {
303        Self {
304            schema: schema.into(),
305            name: name.into(),
306            last_value,
307            is_called: true,
308            increment_by: 1,
309            min_value: 1,
310            max_value: i64::MAX,
311        }
312    }
313
314    /// Get the fully qualified sequence name.
315    pub fn qualified_name(&self) -> String {
316        format!("{}.{}", self.schema, self.name)
317    }
318}
319
320/// Control directive for coordinating producer/receiver behavior.
321///
322/// Control directives are sent as special batches to signal the receiver
323/// to perform administrative operations like disabling/enabling constraints.
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
325pub enum ControlDirective {
326    /// Disable foreign key constraints on the target.
327    /// The receiver should:
328    /// 1. Query all FK constraints
329    /// 2. Store definitions in _scry_admin.foreign_keys
330    /// 3. Drop all FK constraints
331    DisableForeignKeys,
332
333    /// Enable foreign key constraints on the target.
334    /// The receiver should:
335    /// 1. Read FK definitions from _scry_admin.foreign_keys
336    /// 2. Recreate all FK constraints
337    /// 3. Clean up the admin table
338    EnableForeignKeys,
339
340    /// Synchronize sequence values.
341    /// The batch will contain sequence_values with the current state.
342    SyncSequences,
343}
344
345impl std::fmt::Display for ControlDirective {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        match self {
348            Self::DisableForeignKeys => write!(f, "DISABLE_FOREIGN_KEYS"),
349            Self::EnableForeignKeys => write!(f, "ENABLE_FOREIGN_KEYS"),
350            Self::SyncSequences => write!(f, "SYNC_SEQUENCES"),
351        }
352    }
353}
354
355/// A single column value.
356/// Uses raw PostgreSQL binary format bytes for zero-copy efficiency.
357#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
358pub struct ColumnValue {
359    /// Type tag for fast dispatch.
360    pub type_tag: TypeTag,
361
362    /// PostgreSQL type OID (for arrays, custom types, and disambiguation).
363    pub type_oid: u32,
364
365    /// Raw PostgreSQL binary format bytes.
366    /// None represents NULL.
367    pub data: Option<Vec<u8>>,
368}
369
370impl ColumnValue {
371    /// Create a NULL column value.
372    pub fn null() -> Self {
373        Self {
374            type_tag: TypeTag::Null,
375            type_oid: 0,
376            data: None,
377        }
378    }
379
380    /// Create a column value from raw PostgreSQL binary data.
381    pub fn from_pg_binary(type_tag: TypeTag, type_oid: u32, data: Vec<u8>) -> Self {
382        Self {
383            type_tag,
384            type_oid,
385            data: Some(data),
386        }
387    }
388
389    /// Create a marker for an unchanged TOAST value.
390    /// Used in UPDATE operations when a large column value was not modified.
391    /// The type_oid is preserved so the target knows the column's type.
392    pub fn unchanged(type_oid: u32) -> Self {
393        Self {
394            type_tag: TypeTag::ToastUnchanged,
395            type_oid,
396            data: None,
397        }
398    }
399
400    /// Check if this value is NULL.
401    pub fn is_null(&self) -> bool {
402        self.data.is_none() && self.type_tag == TypeTag::Null
403    }
404
405    /// Check if this value is an unchanged TOAST marker.
406    pub fn is_unchanged(&self) -> bool {
407        self.type_tag == TypeTag::ToastUnchanged
408    }
409
410    /// Get the raw data bytes.
411    pub fn as_bytes(&self) -> Option<&[u8]> {
412        self.data.as_deref()
413    }
414
415    /// Estimate the size of this value in bytes.
416    pub fn size_bytes(&self) -> usize {
417        // 1 (tag) + 4 (oid) + data length
418        5 + self.data.as_ref().map_or(0, |d| d.len())
419    }
420}
421
422/// A row of column values.
423#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
424pub struct Row {
425    pub values: Vec<ColumnValue>,
426}
427
428impl Row {
429    /// Create a new row from values.
430    pub fn new(values: Vec<ColumnValue>) -> Self {
431        Self { values }
432    }
433
434    /// Create an empty row.
435    pub fn empty() -> Self {
436        Self { values: Vec::new() }
437    }
438
439    /// Get a value by index.
440    pub fn get(&self, index: usize) -> Option<&ColumnValue> {
441        self.values.get(index)
442    }
443
444    /// Estimate the size of this row in bytes.
445    pub fn size_bytes(&self) -> usize {
446        self.values.iter().map(|v| v.size_bytes()).sum()
447    }
448}
449
450/// Column metadata.
451#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
452pub struct ColumnMeta {
453    /// Column name.
454    pub name: String,
455
456    /// PostgreSQL type OID.
457    pub type_oid: u32,
458
459    /// Type modifier (e.g., varchar length, numeric precision).
460    pub type_modifier: i32,
461
462    /// Whether this column is part of the key.
463    pub is_key: bool,
464}
465
466/// Relation (table) metadata.
467#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
468pub struct RelationMeta {
469    /// Relation ID (PostgreSQL OID).
470    pub rel_id: u32,
471
472    /// Schema name.
473    pub schema: String,
474
475    /// Table name.
476    pub table: String,
477
478    /// Column definitions.
479    pub columns: Vec<ColumnMeta>,
480
481    /// Replica identity setting.
482    pub replica_identity: ReplicaIdentity,
483}
484
485impl RelationMeta {
486    /// Get the fully qualified table name.
487    pub fn qualified_name(&self) -> String {
488        format!("{}.{}", self.schema, self.table)
489    }
490
491    /// Get column names in order.
492    pub fn column_names(&self) -> Vec<&str> {
493        self.columns.iter().map(|c| c.name.as_str()).collect()
494    }
495}
496
497/// A single database event.
498#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
499pub struct DatabaseEvent {
500    /// Unique event identifier (UUID v4).
501    pub event_id: Option<String>,
502
503    /// Event timestamp (Unix timestamp in microseconds).
504    pub timestamp_us: u64,
505
506    /// Operation type.
507    pub operation: OperationType,
508
509    /// Schema name.
510    pub schema: String,
511
512    /// Table name.
513    pub table: String,
514
515    /// Replication position (PostgreSQL LSN as u64).
516    pub position: u64,
517
518    /// Transaction ID.
519    pub transaction_id: u64,
520
521    /// New row data (for INSERT, UPDATE, SnapshotRow).
522    pub new_row: Option<Row>,
523
524    /// Old row data (for UPDATE with REPLICA IDENTITY FULL, DELETE).
525    pub old_row: Option<Row>,
526
527    /// Column names in order.
528    pub columns: Vec<String>,
529
530    /// Relation metadata (sent once per table per stream).
531    pub relation_meta: Option<RelationMeta>,
532
533    /// DDL SQL statement (when operation == Ddl).
534    /// Contains the full CREATE/ALTER/DROP statement.
535    #[serde(default, skip_serializing_if = "Option::is_none")]
536    pub ddl_sql: Option<String>,
537
538    /// DDL object type for filtering/logging.
539    /// e.g., "extension", "table", "index", "constraint", "function", "trigger"
540    #[serde(default, skip_serializing_if = "Option::is_none")]
541    pub ddl_object_type: Option<String>,
542}
543
544impl DatabaseEvent {
545    /// Get the fully qualified table name.
546    pub fn qualified_name(&self) -> String {
547        format!("{}.{}", self.schema, self.table)
548    }
549
550    /// Estimate the size of this event in bytes.
551    pub fn size_bytes(&self) -> usize {
552        let mut size = 64; // Base overhead
553        size += self.schema.len();
554        size += self.table.len();
555        if let Some(ref row) = self.new_row {
556            size += row.size_bytes();
557        }
558        if let Some(ref row) = self.old_row {
559            size += row.size_bytes();
560        }
561        size += self.columns.iter().map(|c| c.len()).sum::<usize>();
562        size
563    }
564}
565
566/// Batch of events for efficient transport.
567#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
568pub struct DatabaseEventBatch {
569    /// Events in this batch.
570    pub events: Vec<DatabaseEvent>,
571
572    /// Source identifier (proxy/connector ID).
573    pub source_id: Option<String>,
574
575    /// Batch sequence number (for ordering/deduplication).
576    pub batch_seq: u64,
577
578    /// Cached relation metadata for this batch.
579    pub relations: Vec<RelationMeta>,
580
581    /// Control directive for this batch (if a control batch).
582    /// When present, this batch signals the receiver to perform
583    /// an administrative operation rather than applying data changes.
584    #[serde(default, skip_serializing_if = "Option::is_none")]
585    pub control_directive: Option<ControlDirective>,
586
587    /// Sequence values for synchronization.
588    /// Present when control_directive is SyncSequences.
589    #[serde(default, skip_serializing_if = "Option::is_none")]
590    pub sequence_values: Option<Vec<SequenceValue>>,
591}
592
593impl DatabaseEventBatch {
594    /// Create a new empty batch.
595    pub fn new() -> Self {
596        Self {
597            events: Vec::new(),
598            source_id: None,
599            batch_seq: 0,
600            relations: Vec::new(),
601            control_directive: None,
602            sequence_values: None,
603        }
604    }
605
606    /// Create a batch with events.
607    pub fn with_events(events: Vec<DatabaseEvent>) -> Self {
608        Self {
609            events,
610            source_id: None,
611            batch_seq: 0,
612            relations: Vec::new(),
613            control_directive: None,
614            sequence_values: None,
615        }
616    }
617
618    /// Create a control batch that signals an administrative operation.
619    pub fn control(directive: ControlDirective) -> Self {
620        Self {
621            events: Vec::new(),
622            source_id: None,
623            batch_seq: 0,
624            relations: Vec::new(),
625            control_directive: Some(directive),
626            sequence_values: None,
627        }
628    }
629
630    /// Create a sequence sync control batch with sequence values.
631    pub fn sequence_sync(sequences: Vec<SequenceValue>) -> Self {
632        Self {
633            events: Vec::new(),
634            source_id: None,
635            batch_seq: 0,
636            relations: Vec::new(),
637            control_directive: Some(ControlDirective::SyncSequences),
638            sequence_values: Some(sequences),
639        }
640    }
641
642    /// Set the source ID.
643    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
644        self.source_id = Some(source_id.into());
645        self
646    }
647
648    /// Set the batch sequence number.
649    pub fn with_batch_seq(mut self, seq: u64) -> Self {
650        self.batch_seq = seq;
651        self
652    }
653
654    /// Set the control directive.
655    pub fn with_control_directive(mut self, directive: ControlDirective) -> Self {
656        self.control_directive = Some(directive);
657        self
658    }
659
660    /// Set sequence values.
661    pub fn with_sequence_values(mut self, sequences: Vec<SequenceValue>) -> Self {
662        self.sequence_values = Some(sequences);
663        self
664    }
665
666    /// Check if this is a control batch (no data events, just a directive).
667    pub fn is_control_batch(&self) -> bool {
668        self.control_directive.is_some()
669    }
670
671    /// Estimate the size of this batch in bytes.
672    pub fn size_bytes(&self) -> usize {
673        let mut size = 32; // Base overhead
674        size += self.events.iter().map(|e| e.size_bytes()).sum::<usize>();
675        size
676    }
677}
678
679impl Default for DatabaseEventBatch {
680    fn default() -> Self {
681        Self::new()
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688
689    #[test]
690    fn test_type_tag_roundtrip() {
691        for tag in [
692            TypeTag::Null,
693            TypeTag::Bool,
694            TypeTag::Int32,
695            TypeTag::Text,
696            TypeTag::Uuid,
697            TypeTag::Array,
698            TypeTag::ToastUnchanged,
699            TypeTag::Custom,
700        ] {
701            let byte = tag as u8;
702            let recovered = TypeTag::from_byte(byte).unwrap();
703            assert_eq!(tag, recovered);
704        }
705    }
706
707    #[test]
708    fn test_operation_type_display() {
709        assert_eq!(OperationType::Insert.to_string(), "INSERT");
710        assert_eq!(OperationType::Update.to_string(), "UPDATE");
711        assert_eq!(OperationType::Delete.to_string(), "DELETE");
712        assert_eq!(OperationType::Begin.to_string(), "BEGIN");
713        assert_eq!(OperationType::Commit.to_string(), "COMMIT");
714        assert_eq!(OperationType::SnapshotRow.to_string(), "SNAPSHOT_ROW");
715    }
716
717    #[test]
718    fn test_replica_identity_pg_char() {
719        assert_eq!(ReplicaIdentity::from_pg_char(b'd'), ReplicaIdentity::Default);
720        assert_eq!(ReplicaIdentity::from_pg_char(b'f'), ReplicaIdentity::Full);
721        assert_eq!(ReplicaIdentity::Full.to_pg_char(), b'f');
722    }
723
724    #[test]
725    fn test_column_value_null() {
726        let null = ColumnValue::null();
727        assert!(null.is_null());
728        assert!(!null.is_unchanged());
729        assert_eq!(null.as_bytes(), None);
730    }
731
732    #[test]
733    fn test_column_value_unchanged() {
734        let unchanged = ColumnValue::unchanged(25); // TEXT type OID
735        assert!(!unchanged.is_null());
736        assert!(unchanged.is_unchanged());
737        assert_eq!(unchanged.type_oid, 25);
738        assert_eq!(unchanged.as_bytes(), None);
739    }
740
741    #[test]
742    fn test_column_value_with_data() {
743        let data = vec![0x00, 0x01, 0x02, 0x03];
744        let value = ColumnValue::from_pg_binary(TypeTag::Int32, 23, data.clone());
745        assert!(!value.is_null());
746        assert_eq!(value.as_bytes(), Some(data.as_slice()));
747    }
748
749    #[test]
750    fn test_row_size_bytes() {
751        let row = Row::new(vec![
752            ColumnValue::null(),
753            ColumnValue::from_pg_binary(TypeTag::Int32, 23, vec![0, 0, 0, 42]),
754        ]);
755        assert!(row.size_bytes() > 0);
756    }
757
758    #[test]
759    fn test_database_event_batch() {
760        let batch = DatabaseEventBatch::new()
761            .with_source_id("test-source")
762            .with_batch_seq(42);
763
764        assert_eq!(batch.source_id, Some("test-source".to_string()));
765        assert_eq!(batch.batch_seq, 42);
766        assert!(!batch.is_control_batch());
767    }
768
769    #[test]
770    fn test_operation_type_control_directives() {
771        // Test control directive operation types
772        assert_eq!(OperationType::SequenceSync.to_string(), "SEQUENCE_SYNC");
773        assert_eq!(OperationType::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
774        assert_eq!(OperationType::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
775
776        // Test from_byte roundtrip
777        assert_eq!(OperationType::from_byte(30), Some(OperationType::SequenceSync));
778        assert_eq!(OperationType::from_byte(31), Some(OperationType::DisableForeignKeys));
779        assert_eq!(OperationType::from_byte(32), Some(OperationType::EnableForeignKeys));
780
781        // Test is_control_directive
782        assert!(OperationType::SequenceSync.is_control_directive());
783        assert!(OperationType::DisableForeignKeys.is_control_directive());
784        assert!(OperationType::EnableForeignKeys.is_control_directive());
785        assert!(!OperationType::Insert.is_control_directive());
786        assert!(!OperationType::SnapshotRow.is_control_directive());
787    }
788
789    #[test]
790    fn test_sequence_value() {
791        let seq = SequenceValue::new("public", "users_id_seq", 100);
792        assert_eq!(seq.schema, "public");
793        assert_eq!(seq.name, "users_id_seq");
794        assert_eq!(seq.last_value, 100);
795        assert!(seq.is_called);
796        assert_eq!(seq.increment_by, 1);
797        assert_eq!(seq.qualified_name(), "public.users_id_seq");
798    }
799
800    #[test]
801    fn test_control_directive_display() {
802        assert_eq!(ControlDirective::DisableForeignKeys.to_string(), "DISABLE_FOREIGN_KEYS");
803        assert_eq!(ControlDirective::EnableForeignKeys.to_string(), "ENABLE_FOREIGN_KEYS");
804        assert_eq!(ControlDirective::SyncSequences.to_string(), "SYNC_SEQUENCES");
805    }
806
807    #[test]
808    fn test_control_batch() {
809        let batch = DatabaseEventBatch::control(ControlDirective::DisableForeignKeys)
810            .with_source_id("backfill-001")
811            .with_batch_seq(1);
812
813        assert!(batch.is_control_batch());
814        assert_eq!(batch.control_directive, Some(ControlDirective::DisableForeignKeys));
815        assert!(batch.events.is_empty());
816        assert!(batch.sequence_values.is_none());
817    }
818
819    #[test]
820    fn test_sequence_sync_batch() {
821        let sequences = vec![
822            SequenceValue::new("public", "users_id_seq", 100),
823            SequenceValue::new("public", "orders_id_seq", 500),
824        ];
825
826        let batch = DatabaseEventBatch::sequence_sync(sequences)
827            .with_source_id("backfill-001")
828            .with_batch_seq(99);
829
830        assert!(batch.is_control_batch());
831        assert_eq!(batch.control_directive, Some(ControlDirective::SyncSequences));
832        assert!(batch.events.is_empty());
833
834        let seq_values = batch.sequence_values.as_ref().unwrap();
835        assert_eq!(seq_values.len(), 2);
836        assert_eq!(seq_values[0].qualified_name(), "public.users_id_seq");
837        assert_eq!(seq_values[1].last_value, 500);
838    }
839}