drasi_source_mssql/
decoder.rs1use anyhow::{anyhow, Result};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[repr(i32)]
24pub enum CdcOperation {
25 Delete = 1,
27 Insert = 2,
29 UpdateBefore = 3,
31 UpdateAfter = 4,
33}
34
35impl CdcOperation {
36 pub fn from_i32(value: i32) -> Result<Self> {
38 match value {
39 1 => Ok(Self::Delete),
40 2 => Ok(Self::Insert),
41 3 => Ok(Self::UpdateBefore),
42 4 => Ok(Self::UpdateAfter),
43 _ => Err(anyhow!("Invalid CDC operation value: {value}")),
44 }
45 }
46
47 pub fn is_update(&self) -> bool {
49 matches!(self, Self::UpdateBefore | Self::UpdateAfter)
50 }
51
52 pub fn is_update_after(&self) -> bool {
54 matches!(self, Self::UpdateAfter)
55 }
56
57 pub fn name(&self) -> &'static str {
59 match self {
60 Self::Delete => "DELETE",
61 Self::Insert => "INSERT",
62 Self::UpdateBefore => "UPDATE_BEFORE",
63 Self::UpdateAfter => "UPDATE_AFTER",
64 }
65 }
66}
67
68impl std::fmt::Display for CdcOperation {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}", self.name())
71 }
72}
73
74pub mod cdc_columns {
76 pub const START_LSN: &str = "__$start_lsn";
78
79 pub const END_LSN: &str = "__$end_lsn";
81
82 pub const SEQVAL: &str = "__$seqval";
84
85 pub const OPERATION: &str = "__$operation";
87
88 pub const UPDATE_MASK: &str = "__$update_mask";
90
91 pub fn is_metadata_column(name: &str) -> bool {
93 name.starts_with("__$")
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn test_operation_from_i32() {
103 assert_eq!(CdcOperation::from_i32(1).unwrap(), CdcOperation::Delete);
104 assert_eq!(CdcOperation::from_i32(2).unwrap(), CdcOperation::Insert);
105 assert_eq!(
106 CdcOperation::from_i32(3).unwrap(),
107 CdcOperation::UpdateBefore
108 );
109 assert_eq!(
110 CdcOperation::from_i32(4).unwrap(),
111 CdcOperation::UpdateAfter
112 );
113
114 assert!(CdcOperation::from_i32(5).is_err());
115 assert!(CdcOperation::from_i32(0).is_err());
116 }
117
118 #[test]
119 fn test_is_update() {
120 assert!(!CdcOperation::Delete.is_update());
121 assert!(!CdcOperation::Insert.is_update());
122 assert!(CdcOperation::UpdateBefore.is_update());
123 assert!(CdcOperation::UpdateAfter.is_update());
124 }
125
126 #[test]
127 fn test_is_update_after() {
128 assert!(!CdcOperation::Delete.is_update_after());
129 assert!(!CdcOperation::Insert.is_update_after());
130 assert!(!CdcOperation::UpdateBefore.is_update_after());
131 assert!(CdcOperation::UpdateAfter.is_update_after());
132 }
133
134 #[test]
135 fn test_operation_name() {
136 assert_eq!(CdcOperation::Delete.name(), "DELETE");
137 assert_eq!(CdcOperation::Insert.name(), "INSERT");
138 assert_eq!(CdcOperation::UpdateBefore.name(), "UPDATE_BEFORE");
139 assert_eq!(CdcOperation::UpdateAfter.name(), "UPDATE_AFTER");
140 }
141
142 #[test]
143 fn test_is_metadata_column() {
144 assert!(cdc_columns::is_metadata_column("__$operation"));
145 assert!(cdc_columns::is_metadata_column("__$start_lsn"));
146 assert!(cdc_columns::is_metadata_column("__$seqval"));
147 assert!(!cdc_columns::is_metadata_column("customer_id"));
148 assert!(!cdc_columns::is_metadata_column("name"));
149 }
150}