use std::fmt;
use bytes::Bytes;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum ChangeOperation {
Insert,
Update,
Delete,
}
impl ChangeOperation {
#[must_use]
pub fn from_sql(s: &str) -> Option<Self> {
match s.trim().to_uppercase().as_str() {
"I" => Some(Self::Insert),
"U" => Some(Self::Update),
"D" => Some(Self::Delete),
_ => None,
}
}
#[must_use]
pub const fn as_sql(&self) -> &'static str {
match self {
Self::Insert => "I",
Self::Update => "U",
Self::Delete => "D",
}
}
#[must_use]
pub const fn is_insert(&self) -> bool {
matches!(self, Self::Insert)
}
#[must_use]
pub const fn is_update(&self) -> bool {
matches!(self, Self::Update)
}
#[must_use]
pub const fn is_delete(&self) -> bool {
matches!(self, Self::Delete)
}
}
impl fmt::Display for ChangeOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Insert => write!(f, "INSERT"),
Self::Update => write!(f, "UPDATE"),
Self::Delete => write!(f, "DELETE"),
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ChangeMetadata {
pub version: i64,
pub creation_version: Option<i64>,
pub operation: ChangeOperation,
pub changed_columns: Option<Bytes>,
pub context: Option<Bytes>,
}
impl ChangeMetadata {
#[must_use]
pub fn new(
version: i64,
creation_version: Option<i64>,
operation: ChangeOperation,
changed_columns: Option<Bytes>,
context: Option<Bytes>,
) -> Self {
Self {
version,
creation_version,
operation,
changed_columns,
context,
}
}
#[must_use]
pub fn insert(version: i64) -> Self {
Self {
version,
creation_version: Some(version),
operation: ChangeOperation::Insert,
changed_columns: None,
context: None,
}
}
#[must_use]
pub fn update(version: i64, creation_version: i64) -> Self {
Self {
version,
creation_version: Some(creation_version),
operation: ChangeOperation::Update,
changed_columns: None,
context: None,
}
}
#[must_use]
pub fn delete(version: i64) -> Self {
Self {
version,
creation_version: None,
operation: ChangeOperation::Delete,
changed_columns: None,
context: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ChangeTrackingQuery {
table_name: String,
last_sync_version: i64,
columns: Option<Vec<String>>,
primary_keys: Option<Vec<String>>,
alias: String,
force_seek: bool,
}
impl ChangeTrackingQuery {
#[must_use]
pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
Self {
table_name: table_name.into(),
last_sync_version,
columns: None,
primary_keys: None,
alias: "CT".into(),
force_seek: false,
}
}
#[must_use]
pub fn with_columns(mut self, columns: &[&str]) -> Self {
self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
self
}
#[must_use]
pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
self
}
#[must_use]
pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
self.alias = alias.into();
self
}
#[must_use]
pub fn with_force_seek(mut self) -> Self {
self.force_seek = true;
self
}
#[must_use]
pub fn to_sql(&self) -> String {
let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
let select_cols = self.build_select_columns();
format!(
"SELECT {} FROM CHANGETABLE(CHANGES {}, {}{})",
select_cols, self.table_name, self.last_sync_version, force_seek
)
}
#[must_use]
pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
let alias = &self.alias;
let ct_cols = format!(
"{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
{alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
);
let data_cols: String = data_columns
.iter()
.map(|c| format!("T.{c}"))
.collect::<Vec<_>>()
.join(", ");
let pk_cols: String = self
.primary_keys
.as_ref()
.map(|pks| {
pks.iter()
.map(|pk| format!("{alias}.{pk}"))
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_default();
let join_condition: String = self
.primary_keys
.as_ref()
.map(|pks| {
pks.iter()
.map(|pk| format!("{alias}.{pk} = T.{pk}"))
.collect::<Vec<_>>()
.join(" AND ")
})
.unwrap_or_else(|| "1=1".into());
let select_cols = if pk_cols.is_empty() {
format!("{ct_cols}, {data_cols}")
} else {
format!("{ct_cols}, {pk_cols}, {data_cols}")
};
format!(
"SELECT {select_cols} \
FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
LEFT OUTER JOIN {table} AS T ON {join_condition}",
table = self.table_name,
version = self.last_sync_version,
)
}
fn build_select_columns(&self) -> String {
let alias = &self.alias;
let mut cols = vec![
format!("{alias}.SYS_CHANGE_VERSION"),
format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
format!("{alias}.SYS_CHANGE_OPERATION"),
format!("{alias}.SYS_CHANGE_COLUMNS"),
format!("{alias}.SYS_CHANGE_CONTEXT"),
];
if let Some(ref pks) = self.primary_keys {
for pk in pks {
cols.push(format!("{alias}.{pk}"));
}
}
if let Some(ref data_cols) = self.columns {
for col in data_cols {
cols.push(format!("{alias}.{col}"));
}
}
cols.join(", ")
}
}
pub struct ChangeTracking;
impl ChangeTracking {
#[must_use]
pub const fn current_version_sql() -> &'static str {
"SELECT CHANGE_TRACKING_CURRENT_VERSION()"
}
#[must_use]
pub fn min_valid_version_sql(table_name: &str) -> String {
format!("SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{table_name}'))")
}
#[must_use]
pub fn column_in_mask_sql(table_name: &str, column_name: &str, mask_variable: &str) -> String {
format!(
"SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
COLUMNPROPERTY(OBJECT_ID(N'{table_name}'), N'{column_name}', 'ColumnId'), \
{mask_variable})"
)
}
#[must_use]
pub fn enable_database_sql(
database_name: &str,
retention_days: u32,
auto_cleanup: bool,
) -> String {
let cleanup = if auto_cleanup { "ON" } else { "OFF" };
format!(
"ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = ON \
(CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})"
)
}
#[must_use]
pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
let track_cols = if track_columns_updated { "ON" } else { "OFF" };
format!(
"ALTER TABLE [{table_name}] ENABLE CHANGE_TRACKING \
WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
)
}
#[must_use]
pub fn disable_table_sql(table_name: &str) -> String {
format!("ALTER TABLE [{table_name}] DISABLE CHANGE_TRACKING")
}
#[must_use]
pub fn disable_database_sql(database_name: &str) -> String {
format!("ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = OFF")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum SyncVersionStatus {
Valid,
TooOld,
NotEnabled,
}
impl SyncVersionStatus {
#[must_use]
pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
match min_valid_version {
None => Self::NotEnabled,
Some(min) if last_sync_version >= min => Self::Valid,
Some(_) => Self::TooOld,
}
}
#[must_use]
pub const fn can_sync_incrementally(&self) -> bool {
matches!(self, Self::Valid)
}
#[must_use]
pub const fn requires_full_sync(&self) -> bool {
matches!(self, Self::TooOld)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_change_operation_from_sql() {
assert_eq!(
ChangeOperation::from_sql("I"),
Some(ChangeOperation::Insert)
);
assert_eq!(
ChangeOperation::from_sql("U"),
Some(ChangeOperation::Update)
);
assert_eq!(
ChangeOperation::from_sql("D"),
Some(ChangeOperation::Delete)
);
assert_eq!(
ChangeOperation::from_sql("i"),
Some(ChangeOperation::Insert)
);
assert_eq!(
ChangeOperation::from_sql(" U "),
Some(ChangeOperation::Update)
);
assert_eq!(ChangeOperation::from_sql("X"), None);
assert_eq!(ChangeOperation::from_sql(""), None);
}
#[test]
fn test_change_operation_as_sql() {
assert_eq!(ChangeOperation::Insert.as_sql(), "I");
assert_eq!(ChangeOperation::Update.as_sql(), "U");
assert_eq!(ChangeOperation::Delete.as_sql(), "D");
}
#[test]
fn test_change_operation_predicates() {
assert!(ChangeOperation::Insert.is_insert());
assert!(!ChangeOperation::Insert.is_update());
assert!(!ChangeOperation::Insert.is_delete());
assert!(!ChangeOperation::Update.is_insert());
assert!(ChangeOperation::Update.is_update());
assert!(!ChangeOperation::Update.is_delete());
assert!(!ChangeOperation::Delete.is_insert());
assert!(!ChangeOperation::Delete.is_update());
assert!(ChangeOperation::Delete.is_delete());
}
#[test]
fn test_change_metadata_constructors() {
let insert = ChangeMetadata::insert(42);
assert_eq!(insert.version, 42);
assert_eq!(insert.creation_version, Some(42));
assert_eq!(insert.operation, ChangeOperation::Insert);
let update = ChangeMetadata::update(50, 42);
assert_eq!(update.version, 50);
assert_eq!(update.creation_version, Some(42));
assert_eq!(update.operation, ChangeOperation::Update);
let delete = ChangeMetadata::delete(60);
assert_eq!(delete.version, 60);
assert_eq!(delete.creation_version, None);
assert_eq!(delete.operation, ChangeOperation::Delete);
}
#[test]
fn test_change_tracking_query_basic() {
let query = ChangeTrackingQuery::changes("Products", 42);
let sql = query.to_sql();
assert!(sql.contains("CHANGETABLE(CHANGES Products, 42)"));
assert!(sql.contains("SYS_CHANGE_VERSION"));
assert!(sql.contains("SYS_CHANGE_OPERATION"));
}
#[test]
fn test_change_tracking_query_with_columns() {
let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
let sql = query.to_sql();
assert!(sql.contains("CT.Name"));
assert!(sql.contains("CT.Price"));
}
#[test]
fn test_change_tracking_query_with_primary_keys() {
let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
let sql = query.to_sql();
assert!(sql.contains("CT.ProductId"));
}
#[test]
fn test_change_tracking_query_force_seek() {
let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
let sql = query.to_sql();
assert!(sql.contains("FORCESEEK"));
}
#[test]
fn test_change_tracking_query_with_data() {
let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
let sql = query.to_sql_with_data(&["Name", "Price"]);
assert!(sql.contains("LEFT OUTER JOIN Products AS T"));
assert!(sql.contains("CT.ProductId = T.ProductId"));
assert!(sql.contains("T.Name"));
assert!(sql.contains("T.Price"));
}
#[test]
fn test_change_tracking_helper_sql() {
assert_eq!(
ChangeTracking::current_version_sql(),
"SELECT CHANGE_TRACKING_CURRENT_VERSION()"
);
let min_sql = ChangeTracking::min_valid_version_sql("Products");
assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
assert!(min_sql.contains("Products"));
let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask");
assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
assert!(mask_sql.contains("Price"));
assert!(mask_sql.contains("@mask"));
}
#[test]
fn test_change_tracking_enable_sql() {
let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
assert!(db_sql.contains("[MyDB]"));
assert!(db_sql.contains("7 DAYS"));
assert!(db_sql.contains("AUTO_CLEANUP = ON"));
let table_sql = ChangeTracking::enable_table_sql("Products", true);
assert!(table_sql.contains("[Products]"));
assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
}
#[test]
fn test_sync_version_status() {
assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
let status = SyncVersionStatus::check(100, None);
assert_eq!(status, SyncVersionStatus::NotEnabled);
assert!(!status.can_sync_incrementally());
}
}