Skip to main content

drasi_source_mssql/
decoder.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! CDC operation decoder
16
17use anyhow::{anyhow, Result};
18
19/// CDC Operation types from MS SQL CDC
20///
21/// These are the values found in the __$operation column of CDC change tables.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[repr(i32)]
24pub enum CdcOperation {
25    /// Delete operation (before image)
26    Delete = 1,
27    /// Insert operation (after image)
28    Insert = 2,
29    /// Update operation - before image (rarely used)
30    UpdateBefore = 3,
31    /// Update operation - after image
32    UpdateAfter = 4,
33}
34
35impl CdcOperation {
36    /// Parse operation from MS SQL CDC __$operation value
37    pub fn from_i32(value: i32) -> Result<Self> {
38        match value {
39            1 => Ok(Self::Delete),
40            2 => Ok(Self::Insert),
41            3 => Ok(Self::UpdateBefore),
42            4 => Ok(Self::UpdateAfter),
43            _ => Err(anyhow!("Invalid CDC operation value: {value}")),
44        }
45    }
46
47    /// Check if this is an update operation (before or after)
48    pub fn is_update(&self) -> bool {
49        matches!(self, Self::UpdateBefore | Self::UpdateAfter)
50    }
51
52    /// Check if this is the after image of an update
53    pub fn is_update_after(&self) -> bool {
54        matches!(self, Self::UpdateAfter)
55    }
56
57    /// Get human-readable name
58    pub fn name(&self) -> &'static str {
59        match self {
60            Self::Delete => "DELETE",
61            Self::Insert => "INSERT",
62            Self::UpdateBefore => "UPDATE_BEFORE",
63            Self::UpdateAfter => "UPDATE_AFTER",
64        }
65    }
66}
67
68impl std::fmt::Display for CdcOperation {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        write!(f, "{}", self.name())
71    }
72}
73
74/// CDC column names (prefixed with __$)
75pub mod cdc_columns {
76    /// LSN of the change
77    pub const START_LSN: &str = "__$start_lsn";
78
79    /// End LSN (always NULL for captured changes)
80    pub const END_LSN: &str = "__$end_lsn";
81
82    /// Sequence value within transaction
83    pub const SEQVAL: &str = "__$seqval";
84
85    /// Operation type (1=delete, 2=insert, 3=update before, 4=update after)
86    pub const OPERATION: &str = "__$operation";
87
88    /// Bit mask showing which columns were updated
89    pub const UPDATE_MASK: &str = "__$update_mask";
90
91    /// Check if a column name is a CDC metadata column
92    pub fn is_metadata_column(name: &str) -> bool {
93        name.starts_with("__$")
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn test_operation_from_i32() {
103        assert_eq!(CdcOperation::from_i32(1).unwrap(), CdcOperation::Delete);
104        assert_eq!(CdcOperation::from_i32(2).unwrap(), CdcOperation::Insert);
105        assert_eq!(
106            CdcOperation::from_i32(3).unwrap(),
107            CdcOperation::UpdateBefore
108        );
109        assert_eq!(
110            CdcOperation::from_i32(4).unwrap(),
111            CdcOperation::UpdateAfter
112        );
113
114        assert!(CdcOperation::from_i32(5).is_err());
115        assert!(CdcOperation::from_i32(0).is_err());
116    }
117
118    #[test]
119    fn test_is_update() {
120        assert!(!CdcOperation::Delete.is_update());
121        assert!(!CdcOperation::Insert.is_update());
122        assert!(CdcOperation::UpdateBefore.is_update());
123        assert!(CdcOperation::UpdateAfter.is_update());
124    }
125
126    #[test]
127    fn test_is_update_after() {
128        assert!(!CdcOperation::Delete.is_update_after());
129        assert!(!CdcOperation::Insert.is_update_after());
130        assert!(!CdcOperation::UpdateBefore.is_update_after());
131        assert!(CdcOperation::UpdateAfter.is_update_after());
132    }
133
134    #[test]
135    fn test_operation_name() {
136        assert_eq!(CdcOperation::Delete.name(), "DELETE");
137        assert_eq!(CdcOperation::Insert.name(), "INSERT");
138        assert_eq!(CdcOperation::UpdateBefore.name(), "UPDATE_BEFORE");
139        assert_eq!(CdcOperation::UpdateAfter.name(), "UPDATE_AFTER");
140    }
141
142    #[test]
143    fn test_is_metadata_column() {
144        assert!(cdc_columns::is_metadata_column("__$operation"));
145        assert!(cdc_columns::is_metadata_column("__$start_lsn"));
146        assert!(cdc_columns::is_metadata_column("__$seqval"));
147        assert!(!cdc_columns::is_metadata_column("customer_id"));
148        assert!(!cdc_columns::is_metadata_column("name"));
149    }
150}