use anyhow::{anyhow, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum CdcOperation {
Delete = 1,
Insert = 2,
UpdateBefore = 3,
UpdateAfter = 4,
}
impl CdcOperation {
pub fn from_i32(value: i32) -> Result<Self> {
match value {
1 => Ok(Self::Delete),
2 => Ok(Self::Insert),
3 => Ok(Self::UpdateBefore),
4 => Ok(Self::UpdateAfter),
_ => Err(anyhow!("Invalid CDC operation value: {value}")),
}
}
pub fn is_update(&self) -> bool {
matches!(self, Self::UpdateBefore | Self::UpdateAfter)
}
pub fn is_update_after(&self) -> bool {
matches!(self, Self::UpdateAfter)
}
pub fn name(&self) -> &'static str {
match self {
Self::Delete => "DELETE",
Self::Insert => "INSERT",
Self::UpdateBefore => "UPDATE_BEFORE",
Self::UpdateAfter => "UPDATE_AFTER",
}
}
}
impl std::fmt::Display for CdcOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
pub mod cdc_columns {
pub const START_LSN: &str = "__$start_lsn";
pub const END_LSN: &str = "__$end_lsn";
pub const SEQVAL: &str = "__$seqval";
pub const OPERATION: &str = "__$operation";
pub const UPDATE_MASK: &str = "__$update_mask";
pub fn is_metadata_column(name: &str) -> bool {
name.starts_with("__$")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_operation_from_i32() {
assert_eq!(CdcOperation::from_i32(1).unwrap(), CdcOperation::Delete);
assert_eq!(CdcOperation::from_i32(2).unwrap(), CdcOperation::Insert);
assert_eq!(
CdcOperation::from_i32(3).unwrap(),
CdcOperation::UpdateBefore
);
assert_eq!(
CdcOperation::from_i32(4).unwrap(),
CdcOperation::UpdateAfter
);
assert!(CdcOperation::from_i32(5).is_err());
assert!(CdcOperation::from_i32(0).is_err());
}
#[test]
fn test_is_update() {
assert!(!CdcOperation::Delete.is_update());
assert!(!CdcOperation::Insert.is_update());
assert!(CdcOperation::UpdateBefore.is_update());
assert!(CdcOperation::UpdateAfter.is_update());
}
#[test]
fn test_is_update_after() {
assert!(!CdcOperation::Delete.is_update_after());
assert!(!CdcOperation::Insert.is_update_after());
assert!(!CdcOperation::UpdateBefore.is_update_after());
assert!(CdcOperation::UpdateAfter.is_update_after());
}
#[test]
fn test_operation_name() {
assert_eq!(CdcOperation::Delete.name(), "DELETE");
assert_eq!(CdcOperation::Insert.name(), "INSERT");
assert_eq!(CdcOperation::UpdateBefore.name(), "UPDATE_BEFORE");
assert_eq!(CdcOperation::UpdateAfter.name(), "UPDATE_AFTER");
}
#[test]
fn test_is_metadata_column() {
assert!(cdc_columns::is_metadata_column("__$operation"));
assert!(cdc_columns::is_metadata_column("__$start_lsn"));
assert!(cdc_columns::is_metadata_column("__$seqval"));
assert!(!cdc_columns::is_metadata_column("customer_id"));
assert!(!cdc_columns::is_metadata_column("name"));
}
}