Skip to main content

mssql_client/
change_tracking.rs

1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```text
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//!     .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//!     .await?
22//!     .first()
23//!     .and_then(|r| r.try_get(0))
24//!     .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//!     .map(|row| ChangedRow::from_row(&row))
30//!     .collect();
31//!
32//! for change in changes {
33//!     match change.operation {
34//!         ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//!         ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//!         ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//!     }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//!     (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//!     WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77    /// A new row was inserted (I).
78    Insert,
79    /// An existing row was updated (U).
80    Update,
81    /// A row was deleted (D).
82    Delete,
83}
84
85impl ChangeOperation {
86    /// Parse a change operation from its single-character SQL Server representation.
87    ///
88    /// # Arguments
89    ///
90    /// * `s` - A string containing 'I', 'U', or 'D'
91    ///
92    /// # Returns
93    ///
94    /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95    #[must_use]
96    pub fn from_sql(s: &str) -> Option<Self> {
97        match s.trim().to_uppercase().as_str() {
98            "I" => Some(Self::Insert),
99            "U" => Some(Self::Update),
100            "D" => Some(Self::Delete),
101            _ => None,
102        }
103    }
104
105    /// Get the SQL Server single-character representation.
106    #[must_use]
107    pub const fn as_sql(&self) -> &'static str {
108        match self {
109            Self::Insert => "I",
110            Self::Update => "U",
111            Self::Delete => "D",
112        }
113    }
114
115    /// Check if this is an insert operation.
116    #[must_use]
117    pub const fn is_insert(&self) -> bool {
118        matches!(self, Self::Insert)
119    }
120
121    /// Check if this is an update operation.
122    #[must_use]
123    pub const fn is_update(&self) -> bool {
124        matches!(self, Self::Update)
125    }
126
127    /// Check if this is a delete operation.
128    #[must_use]
129    pub const fn is_delete(&self) -> bool {
130        matches!(self, Self::Delete)
131    }
132}
133
134impl fmt::Display for ChangeOperation {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::Insert => write!(f, "INSERT"),
138            Self::Update => write!(f, "UPDATE"),
139            Self::Delete => write!(f, "DELETE"),
140        }
141    }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub struct ChangeMetadata {
150    /// The version when the row was last changed.
151    pub version: i64,
152    /// The version when the row was created (inserted).
153    /// This is `None` for delete operations.
154    pub creation_version: Option<i64>,
155    /// The type of change (Insert, Update, Delete).
156    pub operation: ChangeOperation,
157    /// Binary mask of changed columns.
158    /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
159    pub changed_columns: Option<Bytes>,
160    /// Application-defined change context.
161    pub context: Option<Bytes>,
162}
163
164impl ChangeMetadata {
165    /// Create new change metadata.
166    #[must_use]
167    pub fn new(
168        version: i64,
169        creation_version: Option<i64>,
170        operation: ChangeOperation,
171        changed_columns: Option<Bytes>,
172        context: Option<Bytes>,
173    ) -> Self {
174        Self {
175            version,
176            creation_version,
177            operation,
178            changed_columns,
179            context,
180        }
181    }
182
183    /// Create metadata for an insert operation.
184    #[must_use]
185    pub fn insert(version: i64) -> Self {
186        Self {
187            version,
188            creation_version: Some(version),
189            operation: ChangeOperation::Insert,
190            changed_columns: None,
191            context: None,
192        }
193    }
194
195    /// Create metadata for an update operation.
196    #[must_use]
197    pub fn update(version: i64, creation_version: i64) -> Self {
198        Self {
199            version,
200            creation_version: Some(creation_version),
201            operation: ChangeOperation::Update,
202            changed_columns: None,
203            context: None,
204        }
205    }
206
207    /// Create metadata for a delete operation.
208    #[must_use]
209    pub fn delete(version: i64) -> Self {
210        Self {
211            version,
212            creation_version: None,
213            operation: ChangeOperation::Delete,
214            changed_columns: None,
215            context: None,
216        }
217    }
218}
219
220/// Query builder for Change Tracking operations.
221///
222/// Helps construct proper SQL queries for common Change Tracking patterns.
223///
224/// # Example
225///
226/// ```rust
227/// use mssql_client::change_tracking::ChangeTrackingQuery;
228///
229/// // Query for all changes since version 42
230/// let query = ChangeTrackingQuery::changes("Products", 42);
231/// assert!(query.to_sql().contains("CHANGETABLE"));
232///
233/// // Query with specific columns
234/// let query = ChangeTrackingQuery::changes("Orders", 100)
235///     .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
236/// let sql = query.to_sql();
237/// assert!(sql.contains("OrderId"));
238/// ```
239#[derive(Debug, Clone)]
240pub struct ChangeTrackingQuery {
241    table_name: String,
242    last_sync_version: i64,
243    columns: Option<Vec<String>>,
244    primary_keys: Option<Vec<String>>,
245    alias: String,
246    force_seek: bool,
247}
248
249impl ChangeTrackingQuery {
250    /// Create a query for changes to a table since a specific version.
251    ///
252    /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
253    ///
254    /// # Arguments
255    ///
256    /// * `table_name` - The name of the table to query changes for
257    /// * `last_sync_version` - The version from the previous sync (0 for initial)
258    ///
259    /// # Example
260    ///
261    /// ```rust
262    /// use mssql_client::change_tracking::ChangeTrackingQuery;
263    ///
264    /// let query = ChangeTrackingQuery::changes("Products", 42);
265    /// ```
266    #[must_use]
267    pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
268        Self {
269            table_name: table_name.into(),
270            last_sync_version,
271            columns: None,
272            primary_keys: None,
273            alias: "CT".into(),
274            force_seek: false,
275        }
276    }
277
278    /// Specify which data columns to include (in addition to change tracking columns).
279    ///
280    /// If not specified, only change tracking system columns are returned.
281    ///
282    /// # Arguments
283    ///
284    /// * `columns` - Column names to include in the result
285    #[must_use]
286    pub fn with_columns(mut self, columns: &[&str]) -> Self {
287        self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
288        self
289    }
290
291    /// Specify the primary key columns for the table.
292    ///
293    /// This is needed when you want to join change tracking results
294    /// with the original table to get current row data.
295    #[must_use]
296    pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
297        self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
298        self
299    }
300
301    /// Set the table alias for the CHANGETABLE result.
302    #[must_use]
303    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
304        self.alias = alias.into();
305        self
306    }
307
308    /// Enable FORCESEEK hint for the query.
309    ///
310    /// This can improve performance in some scenarios.
311    #[must_use]
312    pub fn with_force_seek(mut self) -> Self {
313        self.force_seek = true;
314        self
315    }
316
317    /// Generate the SQL query string.
318    ///
319    /// This returns a query that can be executed directly.
320    #[must_use]
321    pub fn to_sql(&self) -> String {
322        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
323
324        // Build the SELECT column list
325        let select_cols = self.build_select_columns();
326
327        // SQL Server requires an alias on CHANGETABLE ("A table returned by
328        // the CHANGETABLE function must be aliased.", error 22104).
329        format!(
330            "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{}) AS {}",
331            select_cols, self.table_name, self.last_sync_version, force_seek, self.alias
332        )
333    }
334
335    /// Generate a SQL query that joins with the original table.
336    ///
337    /// This is useful when you need both the change tracking metadata
338    /// and the current row data (for inserts and updates).
339    ///
340    /// # Arguments
341    ///
342    /// * `data_columns` - Columns from the data table to include
343    ///
344    /// # Example
345    ///
346    /// ```rust
347    /// use mssql_client::change_tracking::ChangeTrackingQuery;
348    ///
349    /// let query = ChangeTrackingQuery::changes("Products", 42)
350    ///     .with_primary_keys(&["ProductId"]);
351    /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
352    /// assert!(sql.contains("LEFT OUTER JOIN"));
353    /// ```
354    #[must_use]
355    pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
356        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
357        let alias = &self.alias;
358
359        // Build change tracking columns
360        let ct_cols = format!(
361            "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
362             {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
363        );
364
365        // Build data columns (prefixed with table alias)
366        let data_cols: String = data_columns
367            .iter()
368            .map(|c| format!("T.{c}"))
369            .collect::<Vec<_>>()
370            .join(", ");
371
372        // Build primary key columns from change tracking
373        let pk_cols: String = self
374            .primary_keys
375            .as_ref()
376            .map(|pks| {
377                pks.iter()
378                    .map(|pk| format!("{alias}.{pk}"))
379                    .collect::<Vec<_>>()
380                    .join(", ")
381            })
382            .unwrap_or_default();
383
384        // Build join condition
385        let join_condition: String = self
386            .primary_keys
387            .as_ref()
388            .map(|pks| {
389                pks.iter()
390                    .map(|pk| format!("{alias}.{pk} = T.{pk}"))
391                    .collect::<Vec<_>>()
392                    .join(" AND ")
393            })
394            .unwrap_or_else(|| "1=1".into());
395
396        let select_cols = if pk_cols.is_empty() {
397            format!("{ct_cols}, {data_cols}")
398        } else {
399            format!("{ct_cols}, {pk_cols}, {data_cols}")
400        };
401
402        format!(
403            "SELECT {select_cols} \
404             FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
405             LEFT OUTER JOIN {table} AS T ON {join_condition}",
406            table = self.table_name,
407            version = self.last_sync_version,
408        )
409    }
410
411    fn build_select_columns(&self) -> String {
412        let alias = &self.alias;
413
414        // Always include change tracking system columns
415        let mut cols = vec![
416            format!("{alias}.SYS_CHANGE_VERSION"),
417            format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
418            format!("{alias}.SYS_CHANGE_OPERATION"),
419            format!("{alias}.SYS_CHANGE_COLUMNS"),
420            format!("{alias}.SYS_CHANGE_CONTEXT"),
421        ];
422
423        // Add primary key columns if specified
424        if let Some(ref pks) = self.primary_keys {
425            for pk in pks {
426                cols.push(format!("{alias}.{pk}"));
427            }
428        }
429
430        // Add data columns if specified
431        if let Some(ref data_cols) = self.columns {
432            for col in data_cols {
433                cols.push(format!("{alias}.{col}"));
434            }
435        }
436
437        cols.join(", ")
438    }
439}
440
441/// Helper functions for Change Tracking operations.
442pub struct ChangeTracking;
443
444impl ChangeTracking {
445    /// Generate SQL to get the current change tracking version.
446    ///
447    /// Returns the global change tracking version number.
448    ///
449    /// # Example
450    ///
451    /// ```rust
452    /// use mssql_client::change_tracking::ChangeTracking;
453    ///
454    /// let sql = ChangeTracking::current_version_sql();
455    /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
456    /// ```
457    #[must_use]
458    pub const fn current_version_sql() -> &'static str {
459        "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
460    }
461
462    /// Generate SQL to get the minimum valid version for a table.
463    ///
464    /// If a client's last sync version is less than this, it must
465    /// perform a full re-sync instead of incremental sync.
466    ///
467    /// # Arguments
468    ///
469    /// * `table_name` - The name of the table
470    ///
471    /// # Example
472    ///
473    /// ```rust
474    /// use mssql_client::change_tracking::ChangeTracking;
475    ///
476    /// let sql = ChangeTracking::min_valid_version_sql("Products");
477    /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
478    /// ```
479    #[must_use]
480    pub fn min_valid_version_sql(table_name: &str) -> String {
481        format!(
482            "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{}'))",
483            escape_nvarchar_literal(table_name)
484        )
485    }
486
487    /// Generate SQL to check if a column is in a change mask.
488    ///
489    /// Used to determine which specific columns changed in an update operation.
490    ///
491    /// # Arguments
492    ///
493    /// * `table_name` - The table name
494    /// * `column_name` - The column to check
495    /// * `mask_variable` - The name of the variable holding the change mask
496    ///
497    /// # Errors
498    ///
499    /// Returns [`Error::InvalidIdentifier`](crate::Error::InvalidIdentifier)
500    /// if `mask_variable` is not a valid SQL Server identifier — unlike the
501    /// name arguments (which are escaped into quoted literals), the variable
502    /// is spliced into the statement verbatim, so it must be validated.
503    ///
504    /// # Example
505    ///
506    /// ```rust
507    /// use mssql_client::change_tracking::ChangeTracking;
508    ///
509    /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
510    /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
511    /// ```
512    pub fn column_in_mask_sql(
513        table_name: &str,
514        column_name: &str,
515        mask_variable: &str,
516    ) -> Result<String, crate::Error> {
517        crate::validation::validate_identifier(mask_variable)?;
518        Ok(format!(
519            "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
520             COLUMNPROPERTY(OBJECT_ID(N'{}'), N'{}', 'ColumnId'), \
521             {mask_variable})",
522            escape_nvarchar_literal(table_name),
523            escape_nvarchar_literal(column_name)
524        ))
525    }
526
527    /// Generate SQL to enable change tracking on a database.
528    ///
529    /// # Arguments
530    ///
531    /// * `database_name` - The database name
532    /// * `retention_days` - How long to retain change data
533    /// * `auto_cleanup` - Whether to automatically clean up old data
534    ///
535    /// # Example
536    ///
537    /// ```rust
538    /// use mssql_client::change_tracking::ChangeTracking;
539    ///
540    /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
541    /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
542    /// ```
543    #[must_use]
544    pub fn enable_database_sql(
545        database_name: &str,
546        retention_days: u32,
547        auto_cleanup: bool,
548    ) -> String {
549        let cleanup = if auto_cleanup { "ON" } else { "OFF" };
550        format!(
551            "ALTER DATABASE [{}] SET CHANGE_TRACKING = ON \
552             (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})",
553            database_name.replace(']', "]]")
554        )
555    }
556
557    /// Generate SQL to enable change tracking on a table.
558    ///
559    /// # Arguments
560    ///
561    /// * `table_name` - The table name
562    /// * `track_columns_updated` - Whether to track which columns were updated
563    ///
564    /// # Example
565    ///
566    /// ```rust
567    /// use mssql_client::change_tracking::ChangeTracking;
568    ///
569    /// let sql = ChangeTracking::enable_table_sql("Products", true);
570    /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
571    /// ```
572    #[must_use]
573    pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
574        let track_cols = if track_columns_updated { "ON" } else { "OFF" };
575        let table = Self::bracket_table_name(table_name);
576        format!(
577            "ALTER TABLE {table} ENABLE CHANGE_TRACKING \
578             WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
579        )
580    }
581
582    /// Generate SQL to disable change tracking on a table.
583    #[must_use]
584    pub fn disable_table_sql(table_name: &str) -> String {
585        let table = Self::bracket_table_name(table_name);
586        format!("ALTER TABLE {table} DISABLE CHANGE_TRACKING")
587    }
588
589    /// Bracket a possibly schema-qualified table name part by part:
590    /// `dbo.Items` → `[dbo].[Items]`. Bracketing the whole string as one
591    /// identifier (`[dbo.Items]`) names a table literally called
592    /// "dbo.Items", which fails with error 4902 on real servers.
593    ///
594    /// Interior `]` characters are doubled (the T-SQL QUOTENAME rule), so a
595    /// hostile name like ``foo]; DROP TABLE bar--`` stays a single quoted
596    /// identifier instead of terminating it early and smuggling a second
597    /// statement into the batch. Pass names unbracketed; pre-bracketed input
598    /// is treated as literal characters of the name.
599    fn bracket_table_name(table_name: &str) -> String {
600        table_name
601            .split('.')
602            .map(|part| format!("[{}]", part.replace(']', "]]")))
603            .collect::<Vec<_>>()
604            .join(".")
605    }
606
607    /// Generate SQL to disable change tracking on a database.
608    #[must_use]
609    pub fn disable_database_sql(database_name: &str) -> String {
610        format!(
611            "ALTER DATABASE [{}] SET CHANGE_TRACKING = OFF",
612            database_name.replace(']', "]]")
613        )
614    }
615}
616
617/// Escape a string for inclusion in an `N'...'` literal: per T-SQL rules a
618/// single quote inside a string literal is escaped by doubling it, which
619/// makes any input inert — it cannot terminate the literal early.
620fn escape_nvarchar_literal(s: &str) -> String {
621    s.replace('\'', "''")
622}
623
624/// Result of checking if a sync version is still valid.
625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[non_exhaustive]
627pub enum SyncVersionStatus {
628    /// The sync version is valid and incremental sync can proceed.
629    Valid,
630    /// The sync version is too old; a full re-sync is required.
631    TooOld,
632    /// Change tracking is not enabled or the table doesn't exist.
633    NotEnabled,
634}
635
636impl SyncVersionStatus {
637    /// Check sync version validity from the min_valid_version result.
638    ///
639    /// # Arguments
640    ///
641    /// * `last_sync_version` - The client's last synchronized version
642    /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
643    ///
644    /// # Returns
645    ///
646    /// The sync status indicating whether incremental sync is possible.
647    #[must_use]
648    pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
649        match min_valid_version {
650            None => Self::NotEnabled,
651            Some(min) if last_sync_version >= min => Self::Valid,
652            Some(_) => Self::TooOld,
653        }
654    }
655
656    /// Check if incremental sync is possible.
657    #[must_use]
658    pub const fn can_sync_incrementally(&self) -> bool {
659        matches!(self, Self::Valid)
660    }
661
662    /// Check if a full re-sync is required.
663    #[must_use]
664    pub const fn requires_full_sync(&self) -> bool {
665        matches!(self, Self::TooOld)
666    }
667}
668
669#[cfg(test)]
670#[allow(clippy::unwrap_used)]
671mod tests {
672    use super::*;
673
674    #[test]
675    fn test_change_operation_from_sql() {
676        assert_eq!(
677            ChangeOperation::from_sql("I"),
678            Some(ChangeOperation::Insert)
679        );
680        assert_eq!(
681            ChangeOperation::from_sql("U"),
682            Some(ChangeOperation::Update)
683        );
684        assert_eq!(
685            ChangeOperation::from_sql("D"),
686            Some(ChangeOperation::Delete)
687        );
688        assert_eq!(
689            ChangeOperation::from_sql("i"),
690            Some(ChangeOperation::Insert)
691        );
692        assert_eq!(
693            ChangeOperation::from_sql(" U "),
694            Some(ChangeOperation::Update)
695        );
696        assert_eq!(ChangeOperation::from_sql("X"), None);
697        assert_eq!(ChangeOperation::from_sql(""), None);
698    }
699
700    #[test]
701    fn test_change_operation_as_sql() {
702        assert_eq!(ChangeOperation::Insert.as_sql(), "I");
703        assert_eq!(ChangeOperation::Update.as_sql(), "U");
704        assert_eq!(ChangeOperation::Delete.as_sql(), "D");
705    }
706
707    #[test]
708    fn test_change_operation_predicates() {
709        assert!(ChangeOperation::Insert.is_insert());
710        assert!(!ChangeOperation::Insert.is_update());
711        assert!(!ChangeOperation::Insert.is_delete());
712
713        assert!(!ChangeOperation::Update.is_insert());
714        assert!(ChangeOperation::Update.is_update());
715        assert!(!ChangeOperation::Update.is_delete());
716
717        assert!(!ChangeOperation::Delete.is_insert());
718        assert!(!ChangeOperation::Delete.is_update());
719        assert!(ChangeOperation::Delete.is_delete());
720    }
721
722    #[test]
723    fn test_change_metadata_constructors() {
724        let insert = ChangeMetadata::insert(42);
725        assert_eq!(insert.version, 42);
726        assert_eq!(insert.creation_version, Some(42));
727        assert_eq!(insert.operation, ChangeOperation::Insert);
728
729        let update = ChangeMetadata::update(50, 42);
730        assert_eq!(update.version, 50);
731        assert_eq!(update.creation_version, Some(42));
732        assert_eq!(update.operation, ChangeOperation::Update);
733
734        let delete = ChangeMetadata::delete(60);
735        assert_eq!(delete.version, 60);
736        assert_eq!(delete.creation_version, None);
737        assert_eq!(delete.operation, ChangeOperation::Delete);
738    }
739
740    #[test]
741    fn test_change_tracking_query_basic() {
742        let query = ChangeTrackingQuery::changes("Products", 42);
743        let sql = query.to_sql();
744
745        assert!(sql.contains("CHANGETABLE(CHANGES Products, 42)"));
746        assert!(sql.contains("SYS_CHANGE_VERSION"));
747        assert!(sql.contains("SYS_CHANGE_OPERATION"));
748        // SQL Server rejects CHANGETABLE without an alias (error 22104) —
749        // the substring above matches the broken pre-alias SQL too, so pin
750        // the alias explicitly.
751        assert!(
752            sql.contains(") AS CT"),
753            "CHANGETABLE must be aliased or the query is not executable: {sql}"
754        );
755    }
756
757    #[test]
758    fn test_change_tracking_query_with_columns() {
759        let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
760        let sql = query.to_sql();
761
762        assert!(sql.contains("CT.Name"));
763        assert!(sql.contains("CT.Price"));
764    }
765
766    #[test]
767    fn test_change_tracking_query_with_primary_keys() {
768        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
769        let sql = query.to_sql();
770
771        assert!(sql.contains("CT.ProductId"));
772    }
773
774    #[test]
775    fn test_change_tracking_query_force_seek() {
776        let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
777        let sql = query.to_sql();
778
779        assert!(sql.contains("FORCESEEK"));
780    }
781
782    #[test]
783    fn test_change_tracking_query_with_data() {
784        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
785        let sql = query.to_sql_with_data(&["Name", "Price"]);
786
787        assert!(sql.contains("LEFT OUTER JOIN Products AS T"));
788        assert!(sql.contains("CT.ProductId = T.ProductId"));
789        assert!(sql.contains("T.Name"));
790        assert!(sql.contains("T.Price"));
791    }
792
793    #[test]
794    fn test_change_tracking_helper_sql() {
795        assert_eq!(
796            ChangeTracking::current_version_sql(),
797            "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
798        );
799
800        let min_sql = ChangeTracking::min_valid_version_sql("Products");
801        assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
802        assert!(min_sql.contains("Products"));
803
804        let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
805        assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
806        assert!(mask_sql.contains("Price"));
807        assert!(mask_sql.contains("@mask"));
808    }
809
810    /// Issue #144: the helpers that embed names in `N'...'` literals must
811    /// double interior single quotes so a hostile name cannot terminate the
812    /// literal and smuggle SQL into the statement.
813    #[test]
814    fn test_nvarchar_literal_names_cannot_break_out() {
815        let hostile = "x'); SELECT 1--";
816
817        let sql = ChangeTracking::min_valid_version_sql(hostile);
818        assert!(
819            sql.contains("N'x''); SELECT 1--'"),
820            "single quotes must be doubled, got: {sql}"
821        );
822        assert!(!sql.contains("N'x');"), "literal must not end early: {sql}");
823
824        let sql = ChangeTracking::column_in_mask_sql(hostile, hostile, "@mask").unwrap();
825        assert!(sql.contains("N'x''); SELECT 1--'"));
826        assert!(!sql.contains("N'x');"));
827    }
828
829    /// Issue #144: `mask_variable` is spliced verbatim (no quoting can make
830    /// it safe), so it must be validated as an identifier.
831    #[test]
832    fn test_mask_variable_is_validated() {
833        assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask").is_ok());
834        assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask); DROP TABLE x--").is_err());
835        assert!(ChangeTracking::column_in_mask_sql("T", "C", "1 OR 1=1").is_err());
836        assert!(ChangeTracking::column_in_mask_sql("T", "C", "").is_err());
837    }
838
839    /// Issue #144: the database-level helpers bracket-quote the name and must
840    /// double interior `]` (QUOTENAME rule), matching the table helpers.
841    #[test]
842    fn test_database_name_brackets_are_escaped() {
843        let hostile = "x]; DROP DATABASE foo--";
844
845        let sql = ChangeTracking::enable_database_sql(hostile, 2, true);
846        assert!(
847            sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"),
848            "interior ] must be doubled, got: {sql}"
849        );
850        assert!(!sql.contains("ALTER DATABASE [x];"));
851
852        let sql = ChangeTracking::disable_database_sql(hostile);
853        assert!(sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"));
854        assert!(!sql.contains("ALTER DATABASE [x];"));
855    }
856
857    #[test]
858    fn test_change_tracking_enable_sql() {
859        let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
860        assert!(db_sql.contains("[MyDB]"));
861        assert!(db_sql.contains("7 DAYS"));
862        assert!(db_sql.contains("AUTO_CLEANUP = ON"));
863
864        let table_sql = ChangeTracking::enable_table_sql("Products", true);
865        assert!(table_sql.contains("[Products]"));
866        assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
867
868        // Schema-qualified names must be bracketed part by part —
869        // `[dbo.Products]` is a single (nonexistent) identifier and fails
870        // with error 4902 on a real server.
871        let qualified = ChangeTracking::enable_table_sql("dbo.Products", true);
872        assert!(qualified.contains("ALTER TABLE [dbo].[Products]"));
873        let disable = ChangeTracking::disable_table_sql("dbo.Products");
874        assert!(disable.contains("ALTER TABLE [dbo].[Products]"));
875    }
876
877    /// PR #143 review, Blocker 2: interior `]` must be doubled (QUOTENAME
878    /// rule) so a hostile table name cannot terminate the bracketed
879    /// identifier early and smuggle a second statement into the batch.
880    #[test]
881    fn test_bracket_escapes_closing_brackets() {
882        let sql = ChangeTracking::enable_table_sql("foo]; DROP TABLE bar--", true);
883        assert!(
884            sql.contains("ALTER TABLE [foo]]; DROP TABLE bar--]"),
885            "interior ] must be doubled, got: {sql}"
886        );
887        assert!(
888            !sql.contains("ALTER TABLE [foo];"),
889            "the identifier must not be terminated early: {sql}"
890        );
891
892        let sql = ChangeTracking::disable_table_sql("we]ird.na]me");
893        assert!(sql.contains("ALTER TABLE [we]]ird].[na]]me]"));
894    }
895
896    #[test]
897    fn test_sync_version_status() {
898        // Valid case
899        assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
900        assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
901
902        // Too old case
903        assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
904
905        // Not enabled case
906        let status = SyncVersionStatus::check(100, None);
907        assert_eq!(status, SyncVersionStatus::NotEnabled);
908        assert!(!status.can_sync_incrementally());
909    }
910}