Skip to main content

mssql_client/
change_tracking.rs

1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```text
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//!     .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//!     .await?
22//!     .first()
23//!     .and_then(|r| r.try_get(0))
24//!     .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//!     .map(|row| ChangedRow::from_row(&row))
30//!     .collect();
31//!
32//! for change in changes {
33//!     match change.operation {
34//!         ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//!         ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//!         ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//!     }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//!     (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//!     WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77    /// A new row was inserted (I).
78    Insert,
79    /// An existing row was updated (U).
80    Update,
81    /// A row was deleted (D).
82    Delete,
83}
84
85impl ChangeOperation {
86    /// Parse a change operation from its single-character SQL Server representation.
87    ///
88    /// # Arguments
89    ///
90    /// * `s` - A string containing 'I', 'U', or 'D'
91    ///
92    /// # Returns
93    ///
94    /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95    #[must_use]
96    pub fn from_sql(s: &str) -> Option<Self> {
97        match s.trim().to_uppercase().as_str() {
98            "I" => Some(Self::Insert),
99            "U" => Some(Self::Update),
100            "D" => Some(Self::Delete),
101            _ => None,
102        }
103    }
104
105    /// Get the SQL Server single-character representation.
106    #[must_use]
107    pub const fn as_sql(&self) -> &'static str {
108        match self {
109            Self::Insert => "I",
110            Self::Update => "U",
111            Self::Delete => "D",
112        }
113    }
114
115    /// Check if this is an insert operation.
116    #[must_use]
117    pub const fn is_insert(&self) -> bool {
118        matches!(self, Self::Insert)
119    }
120
121    /// Check if this is an update operation.
122    #[must_use]
123    pub const fn is_update(&self) -> bool {
124        matches!(self, Self::Update)
125    }
126
127    /// Check if this is a delete operation.
128    #[must_use]
129    pub const fn is_delete(&self) -> bool {
130        matches!(self, Self::Delete)
131    }
132}
133
134impl fmt::Display for ChangeOperation {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::Insert => write!(f, "INSERT"),
138            Self::Update => write!(f, "UPDATE"),
139            Self::Delete => write!(f, "DELETE"),
140        }
141    }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub struct ChangeMetadata {
150    /// The version when the row was last changed.
151    pub version: i64,
152    /// The version when the row was created (inserted).
153    /// This is `None` for delete operations.
154    pub creation_version: Option<i64>,
155    /// The type of change (Insert, Update, Delete).
156    pub operation: ChangeOperation,
157    /// Binary mask of changed columns.
158    /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
159    pub changed_columns: Option<Bytes>,
160    /// Application-defined change context.
161    pub context: Option<Bytes>,
162}
163
164impl ChangeMetadata {
165    /// Create new change metadata.
166    #[must_use]
167    pub fn new(
168        version: i64,
169        creation_version: Option<i64>,
170        operation: ChangeOperation,
171        changed_columns: Option<Bytes>,
172        context: Option<Bytes>,
173    ) -> Self {
174        Self {
175            version,
176            creation_version,
177            operation,
178            changed_columns,
179            context,
180        }
181    }
182
183    /// Create metadata for an insert operation.
184    #[must_use]
185    pub fn insert(version: i64) -> Self {
186        Self {
187            version,
188            creation_version: Some(version),
189            operation: ChangeOperation::Insert,
190            changed_columns: None,
191            context: None,
192        }
193    }
194
195    /// Create metadata for an update operation.
196    #[must_use]
197    pub fn update(version: i64, creation_version: i64) -> Self {
198        Self {
199            version,
200            creation_version: Some(creation_version),
201            operation: ChangeOperation::Update,
202            changed_columns: None,
203            context: None,
204        }
205    }
206
207    /// Create metadata for a delete operation.
208    #[must_use]
209    pub fn delete(version: i64) -> Self {
210        Self {
211            version,
212            creation_version: None,
213            operation: ChangeOperation::Delete,
214            changed_columns: None,
215            context: None,
216        }
217    }
218}
219
220/// Query builder for Change Tracking operations.
221///
222/// Helps construct proper SQL queries for common Change Tracking patterns.
223///
224/// # Example
225///
226/// ```rust
227/// use mssql_client::change_tracking::ChangeTrackingQuery;
228///
229/// // Query for all changes since version 42
230/// let query = ChangeTrackingQuery::changes("Products", 42);
231/// assert!(query.to_sql().contains("CHANGETABLE"));
232///
233/// // Query with specific columns
234/// let query = ChangeTrackingQuery::changes("Orders", 100)
235///     .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
236/// let sql = query.to_sql();
237/// assert!(sql.contains("OrderId"));
238/// ```
239#[derive(Debug, Clone)]
240pub struct ChangeTrackingQuery {
241    table_name: String,
242    last_sync_version: i64,
243    columns: Option<Vec<String>>,
244    primary_keys: Option<Vec<String>>,
245    alias: String,
246    force_seek: bool,
247}
248
249impl ChangeTrackingQuery {
250    /// Create a query for changes to a table since a specific version.
251    ///
252    /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
253    ///
254    /// # Arguments
255    ///
256    /// * `table_name` - The name of the table to query changes for
257    /// * `last_sync_version` - The version from the previous sync (0 for initial)
258    ///
259    /// # Example
260    ///
261    /// ```rust
262    /// use mssql_client::change_tracking::ChangeTrackingQuery;
263    ///
264    /// let query = ChangeTrackingQuery::changes("Products", 42);
265    /// ```
266    #[must_use]
267    pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
268        Self {
269            table_name: table_name.into(),
270            last_sync_version,
271            columns: None,
272            primary_keys: None,
273            alias: "CT".into(),
274            force_seek: false,
275        }
276    }
277
278    /// Specify which data columns to include (in addition to change tracking columns).
279    ///
280    /// If not specified, only change tracking system columns are returned.
281    ///
282    /// # Arguments
283    ///
284    /// * `columns` - Column names to include in the result
285    #[must_use]
286    pub fn with_columns(mut self, columns: &[&str]) -> Self {
287        self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
288        self
289    }
290
291    /// Specify the primary key columns for the table.
292    ///
293    /// This is needed when you want to join change tracking results
294    /// with the original table to get current row data.
295    #[must_use]
296    pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
297        self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
298        self
299    }
300
301    /// Set the table alias for the CHANGETABLE result.
302    #[must_use]
303    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
304        self.alias = alias.into();
305        self
306    }
307
308    /// Enable FORCESEEK hint for the query.
309    ///
310    /// This can improve performance in some scenarios.
311    #[must_use]
312    pub fn with_force_seek(mut self) -> Self {
313        self.force_seek = true;
314        self
315    }
316
317    /// Generate the SQL query string.
318    ///
319    /// This returns a query that can be executed directly. The table name is
320    /// bracket-quoted part by part (`dbo.Items` → `[dbo].[Items]`) and the
321    /// alias, primary key, and column names are bracket-quoted as single
322    /// identifiers; pass all of them unbracketed — pre-bracketed input is
323    /// treated as literal characters of the name.
324    #[must_use]
325    pub fn to_sql(&self) -> String {
326        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
327
328        // Build the SELECT column list
329        let select_cols = self.build_select_columns();
330
331        // SQL Server requires an alias on CHANGETABLE ("A table returned by
332        // the CHANGETABLE function must be aliased.", error 22104).
333        format!(
334            "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{}) AS {}",
335            select_cols,
336            ChangeTracking::bracket_table_name(&self.table_name),
337            self.last_sync_version,
338            force_seek,
339            bracket_identifier(&self.alias)
340        )
341    }
342
343    /// Generate a SQL query that joins with the original table.
344    ///
345    /// This is useful when you need both the change tracking metadata
346    /// and the current row data (for inserts and updates).
347    ///
348    /// # Arguments
349    ///
350    /// * `data_columns` - Columns from the data table to include
351    ///
352    /// # Example
353    ///
354    /// ```rust
355    /// use mssql_client::change_tracking::ChangeTrackingQuery;
356    ///
357    /// let query = ChangeTrackingQuery::changes("Products", 42)
358    ///     .with_primary_keys(&["ProductId"]);
359    /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
360    /// assert!(sql.contains("LEFT OUTER JOIN"));
361    /// ```
362    #[must_use]
363    pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
364        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
365        let alias = bracket_identifier(&self.alias);
366
367        // Build change tracking columns
368        let ct_cols = format!(
369            "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
370             {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
371        );
372
373        // Build data columns (prefixed with table alias)
374        let data_cols: String = data_columns
375            .iter()
376            .map(|c| format!("T.{}", bracket_identifier(c)))
377            .collect::<Vec<_>>()
378            .join(", ");
379
380        // Build primary key columns from change tracking
381        let pk_cols: String = self
382            .primary_keys
383            .as_ref()
384            .map(|pks| {
385                pks.iter()
386                    .map(|pk| format!("{alias}.{}", bracket_identifier(pk)))
387                    .collect::<Vec<_>>()
388                    .join(", ")
389            })
390            .unwrap_or_default();
391
392        // Build join condition
393        let join_condition: String = self
394            .primary_keys
395            .as_ref()
396            .map(|pks| {
397                pks.iter()
398                    .map(|pk| {
399                        let pk = bracket_identifier(pk);
400                        format!("{alias}.{pk} = T.{pk}")
401                    })
402                    .collect::<Vec<_>>()
403                    .join(" AND ")
404            })
405            .unwrap_or_else(|| "1=1".into());
406
407        let select_cols = if pk_cols.is_empty() {
408            format!("{ct_cols}, {data_cols}")
409        } else {
410            format!("{ct_cols}, {pk_cols}, {data_cols}")
411        };
412
413        format!(
414            "SELECT {select_cols} \
415             FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
416             LEFT OUTER JOIN {table} AS T ON {join_condition}",
417            table = ChangeTracking::bracket_table_name(&self.table_name),
418            version = self.last_sync_version,
419        )
420    }
421
422    fn build_select_columns(&self) -> String {
423        let alias = bracket_identifier(&self.alias);
424
425        // Always include change tracking system columns
426        let mut cols = vec![
427            format!("{alias}.SYS_CHANGE_VERSION"),
428            format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
429            format!("{alias}.SYS_CHANGE_OPERATION"),
430            format!("{alias}.SYS_CHANGE_COLUMNS"),
431            format!("{alias}.SYS_CHANGE_CONTEXT"),
432        ];
433
434        // Add primary key columns if specified
435        if let Some(ref pks) = self.primary_keys {
436            for pk in pks {
437                cols.push(format!("{alias}.{}", bracket_identifier(pk)));
438            }
439        }
440
441        // Add data columns if specified
442        if let Some(ref data_cols) = self.columns {
443            for col in data_cols {
444                cols.push(format!("{alias}.{}", bracket_identifier(col)));
445            }
446        }
447
448        cols.join(", ")
449    }
450}
451
452/// Helper functions for Change Tracking operations.
453pub struct ChangeTracking;
454
455impl ChangeTracking {
456    /// Generate SQL to get the current change tracking version.
457    ///
458    /// Returns the global change tracking version number.
459    ///
460    /// # Example
461    ///
462    /// ```rust
463    /// use mssql_client::change_tracking::ChangeTracking;
464    ///
465    /// let sql = ChangeTracking::current_version_sql();
466    /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
467    /// ```
468    #[must_use]
469    pub const fn current_version_sql() -> &'static str {
470        "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
471    }
472
473    /// Generate SQL to get the minimum valid version for a table.
474    ///
475    /// If a client's last sync version is less than this, it must
476    /// perform a full re-sync instead of incremental sync.
477    ///
478    /// # Arguments
479    ///
480    /// * `table_name` - The name of the table
481    ///
482    /// # Example
483    ///
484    /// ```rust
485    /// use mssql_client::change_tracking::ChangeTracking;
486    ///
487    /// let sql = ChangeTracking::min_valid_version_sql("Products");
488    /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
489    /// ```
490    #[must_use]
491    pub fn min_valid_version_sql(table_name: &str) -> String {
492        format!(
493            "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{}'))",
494            escape_nvarchar_literal(table_name)
495        )
496    }
497
498    /// Generate SQL to check if a column is in a change mask.
499    ///
500    /// Used to determine which specific columns changed in an update operation.
501    ///
502    /// # Arguments
503    ///
504    /// * `table_name` - The table name
505    /// * `column_name` - The column to check
506    /// * `mask_variable` - The name of the variable holding the change mask
507    ///
508    /// # Errors
509    ///
510    /// Returns [`Error::InvalidIdentifier`](crate::Error::InvalidIdentifier)
511    /// if `mask_variable` is not a valid SQL Server identifier — unlike the
512    /// name arguments (which are escaped into quoted literals), the variable
513    /// is spliced into the statement verbatim, so it must be validated.
514    ///
515    /// # Example
516    ///
517    /// ```rust
518    /// use mssql_client::change_tracking::ChangeTracking;
519    ///
520    /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
521    /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
522    /// ```
523    pub fn column_in_mask_sql(
524        table_name: &str,
525        column_name: &str,
526        mask_variable: &str,
527    ) -> Result<String, crate::Error> {
528        crate::validation::validate_identifier(mask_variable)?;
529        Ok(format!(
530            "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
531             COLUMNPROPERTY(OBJECT_ID(N'{}'), N'{}', 'ColumnId'), \
532             {mask_variable})",
533            escape_nvarchar_literal(table_name),
534            escape_nvarchar_literal(column_name)
535        ))
536    }
537
538    /// Generate SQL to enable change tracking on a database.
539    ///
540    /// # Arguments
541    ///
542    /// * `database_name` - The database name
543    /// * `retention_days` - How long to retain change data
544    /// * `auto_cleanup` - Whether to automatically clean up old data
545    ///
546    /// # Example
547    ///
548    /// ```rust
549    /// use mssql_client::change_tracking::ChangeTracking;
550    ///
551    /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
552    /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
553    /// ```
554    #[must_use]
555    pub fn enable_database_sql(
556        database_name: &str,
557        retention_days: u32,
558        auto_cleanup: bool,
559    ) -> String {
560        let cleanup = if auto_cleanup { "ON" } else { "OFF" };
561        format!(
562            "ALTER DATABASE [{}] SET CHANGE_TRACKING = ON \
563             (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})",
564            database_name.replace(']', "]]")
565        )
566    }
567
568    /// Generate SQL to enable change tracking on a table.
569    ///
570    /// # Arguments
571    ///
572    /// * `table_name` - The table name
573    /// * `track_columns_updated` - Whether to track which columns were updated
574    ///
575    /// # Example
576    ///
577    /// ```rust
578    /// use mssql_client::change_tracking::ChangeTracking;
579    ///
580    /// let sql = ChangeTracking::enable_table_sql("Products", true);
581    /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
582    /// ```
583    #[must_use]
584    pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
585        let track_cols = if track_columns_updated { "ON" } else { "OFF" };
586        let table = Self::bracket_table_name(table_name);
587        format!(
588            "ALTER TABLE {table} ENABLE CHANGE_TRACKING \
589             WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
590        )
591    }
592
593    /// Generate SQL to disable change tracking on a table.
594    #[must_use]
595    pub fn disable_table_sql(table_name: &str) -> String {
596        let table = Self::bracket_table_name(table_name);
597        format!("ALTER TABLE {table} DISABLE CHANGE_TRACKING")
598    }
599
600    /// Bracket a possibly schema-qualified table name part by part:
601    /// `dbo.Items` → `[dbo].[Items]`. Bracketing the whole string as one
602    /// identifier (`[dbo.Items]`) names a table literally called
603    /// "dbo.Items", which fails with error 4902 on real servers.
604    ///
605    /// Interior `]` characters are doubled (the T-SQL QUOTENAME rule), so a
606    /// hostile name like ``foo]; DROP TABLE bar--`` stays a single quoted
607    /// identifier instead of terminating it early and smuggling a second
608    /// statement into the batch. Pass names unbracketed; pre-bracketed input
609    /// is treated as literal characters of the name.
610    fn bracket_table_name(table_name: &str) -> String {
611        table_name
612            .split('.')
613            .map(|part| format!("[{}]", part.replace(']', "]]")))
614            .collect::<Vec<_>>()
615            .join(".")
616    }
617
618    /// Generate SQL to disable change tracking on a database.
619    #[must_use]
620    pub fn disable_database_sql(database_name: &str) -> String {
621        format!(
622            "ALTER DATABASE [{}] SET CHANGE_TRACKING = OFF",
623            database_name.replace(']', "]]")
624        )
625    }
626}
627
628/// Escape a string for inclusion in an `N'...'` literal: per T-SQL rules a
629/// single quote inside a string literal is escaped by doubling it, which
630/// makes any input inert — it cannot terminate the literal early.
631fn escape_nvarchar_literal(s: &str) -> String {
632    s.replace('\'', "''")
633}
634
635/// Bracket a single identifier (alias, column, or primary-key name) with
636/// interior `]` doubled (the T-SQL QUOTENAME rule), so hostile input stays
637/// one quoted identifier instead of escaping into the surrounding statement.
638/// Pass names unbracketed; pre-bracketed input is treated as literal
639/// characters of the name.
640fn bracket_identifier(name: &str) -> String {
641    format!("[{}]", name.replace(']', "]]"))
642}
643
644/// Result of checking if a sync version is still valid.
645#[derive(Debug, Clone, Copy, PartialEq, Eq)]
646#[non_exhaustive]
647pub enum SyncVersionStatus {
648    /// The sync version is valid and incremental sync can proceed.
649    Valid,
650    /// The sync version is too old; a full re-sync is required.
651    TooOld,
652    /// Change tracking is not enabled or the table doesn't exist.
653    NotEnabled,
654}
655
656impl SyncVersionStatus {
657    /// Check sync version validity from the min_valid_version result.
658    ///
659    /// # Arguments
660    ///
661    /// * `last_sync_version` - The client's last synchronized version
662    /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
663    ///
664    /// # Returns
665    ///
666    /// The sync status indicating whether incremental sync is possible.
667    #[must_use]
668    pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
669        match min_valid_version {
670            None => Self::NotEnabled,
671            Some(min) if last_sync_version >= min => Self::Valid,
672            Some(_) => Self::TooOld,
673        }
674    }
675
676    /// Check if incremental sync is possible.
677    #[must_use]
678    pub const fn can_sync_incrementally(&self) -> bool {
679        matches!(self, Self::Valid)
680    }
681
682    /// Check if a full re-sync is required.
683    #[must_use]
684    pub const fn requires_full_sync(&self) -> bool {
685        matches!(self, Self::TooOld)
686    }
687}
688
689#[cfg(test)]
690#[allow(clippy::unwrap_used)]
691mod tests {
692    use super::*;
693
694    #[test]
695    fn test_change_operation_from_sql() {
696        assert_eq!(
697            ChangeOperation::from_sql("I"),
698            Some(ChangeOperation::Insert)
699        );
700        assert_eq!(
701            ChangeOperation::from_sql("U"),
702            Some(ChangeOperation::Update)
703        );
704        assert_eq!(
705            ChangeOperation::from_sql("D"),
706            Some(ChangeOperation::Delete)
707        );
708        assert_eq!(
709            ChangeOperation::from_sql("i"),
710            Some(ChangeOperation::Insert)
711        );
712        assert_eq!(
713            ChangeOperation::from_sql(" U "),
714            Some(ChangeOperation::Update)
715        );
716        assert_eq!(ChangeOperation::from_sql("X"), None);
717        assert_eq!(ChangeOperation::from_sql(""), None);
718    }
719
720    #[test]
721    fn test_change_operation_as_sql() {
722        assert_eq!(ChangeOperation::Insert.as_sql(), "I");
723        assert_eq!(ChangeOperation::Update.as_sql(), "U");
724        assert_eq!(ChangeOperation::Delete.as_sql(), "D");
725    }
726
727    #[test]
728    fn test_change_operation_predicates() {
729        assert!(ChangeOperation::Insert.is_insert());
730        assert!(!ChangeOperation::Insert.is_update());
731        assert!(!ChangeOperation::Insert.is_delete());
732
733        assert!(!ChangeOperation::Update.is_insert());
734        assert!(ChangeOperation::Update.is_update());
735        assert!(!ChangeOperation::Update.is_delete());
736
737        assert!(!ChangeOperation::Delete.is_insert());
738        assert!(!ChangeOperation::Delete.is_update());
739        assert!(ChangeOperation::Delete.is_delete());
740    }
741
742    #[test]
743    fn test_change_metadata_constructors() {
744        let insert = ChangeMetadata::insert(42);
745        assert_eq!(insert.version, 42);
746        assert_eq!(insert.creation_version, Some(42));
747        assert_eq!(insert.operation, ChangeOperation::Insert);
748
749        let update = ChangeMetadata::update(50, 42);
750        assert_eq!(update.version, 50);
751        assert_eq!(update.creation_version, Some(42));
752        assert_eq!(update.operation, ChangeOperation::Update);
753
754        let delete = ChangeMetadata::delete(60);
755        assert_eq!(delete.version, 60);
756        assert_eq!(delete.creation_version, None);
757        assert_eq!(delete.operation, ChangeOperation::Delete);
758    }
759
760    #[test]
761    fn test_change_tracking_query_basic() {
762        let query = ChangeTrackingQuery::changes("Products", 42);
763        let sql = query.to_sql();
764
765        assert!(sql.contains("CHANGETABLE(CHANGES [Products], 42)"));
766        assert!(sql.contains("SYS_CHANGE_VERSION"));
767        assert!(sql.contains("SYS_CHANGE_OPERATION"));
768        // SQL Server rejects CHANGETABLE without an alias (error 22104) —
769        // the substring above matches the broken pre-alias SQL too, so pin
770        // the alias explicitly.
771        assert!(
772            sql.contains(") AS [CT]"),
773            "CHANGETABLE must be aliased or the query is not executable: {sql}"
774        );
775    }
776
777    #[test]
778    fn test_change_tracking_query_with_columns() {
779        let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
780        let sql = query.to_sql();
781
782        assert!(sql.contains("[CT].[Name]"));
783        assert!(sql.contains("[CT].[Price]"));
784    }
785
786    #[test]
787    fn test_change_tracking_query_with_primary_keys() {
788        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
789        let sql = query.to_sql();
790
791        assert!(sql.contains("[CT].[ProductId]"));
792    }
793
794    #[test]
795    fn test_change_tracking_query_force_seek() {
796        let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
797        let sql = query.to_sql();
798
799        assert!(sql.contains("FORCESEEK"));
800    }
801
802    #[test]
803    fn test_change_tracking_query_with_data() {
804        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
805        let sql = query.to_sql_with_data(&["Name", "Price"]);
806
807        assert!(sql.contains("LEFT OUTER JOIN [Products] AS T"));
808        assert!(sql.contains("[CT].[ProductId] = T.[ProductId]"));
809        assert!(sql.contains("T.[Name]"));
810        assert!(sql.contains("T.[Price]"));
811    }
812
813    /// Issue #186: the query builder spliced identifiers verbatim, so a
814    /// hostile table name could terminate CHANGETABLE(...) and smuggle a
815    /// second statement into the batch. Same rules as the #144 helpers:
816    /// per-part bracketing for the table name, single-identifier bracketing
817    /// with `]`-doubling for alias/column/PK names.
818    #[test]
819    fn test_change_tracking_query_brackets_hostile_identifiers() {
820        let hostile_table = "T, 0) AS CT; DROP TABLE x--";
821        let sql = ChangeTrackingQuery::changes(hostile_table, 42).to_sql();
822        assert!(
823            sql.contains("CHANGETABLE(CHANGES [T, 0) AS CT; DROP TABLE x--], 42)"),
824            "hostile table name must stay one quoted identifier: {sql}"
825        );
826
827        // `]` is doubled so the identifier cannot be closed early.
828        let sql = ChangeTrackingQuery::changes("foo]; DROP TABLE bar--", 1).to_sql();
829        assert!(sql.contains("CHANGETABLE(CHANGES [foo]]; DROP TABLE bar--], 1)"));
830
831        // Schema-qualified names bracket part by part.
832        let sql = ChangeTrackingQuery::changes("dbo.Items", 1).to_sql();
833        assert!(sql.contains("CHANGETABLE(CHANGES [dbo].[Items], 1)"));
834
835        // Alias and column names are bracketed in the SELECT list.
836        let sql = ChangeTrackingQuery::changes("Products", 1)
837            .with_alias("A]; DROP TABLE x--")
838            .with_columns(&["C] FROM x--"])
839            .to_sql();
840        assert!(sql.contains("AS [A]]; DROP TABLE x--]"));
841        assert!(sql.contains("[A]]; DROP TABLE x--].[C]] FROM x--]"));
842
843        // PK and data-column names are bracketed in the join form too.
844        let sql = ChangeTrackingQuery::changes("Products", 1)
845            .with_primary_keys(&["P]--"])
846            .to_sql_with_data(&["D]--"]);
847        assert!(sql.contains("[CT].[P]]--] = T.[P]]--]"));
848        assert!(sql.contains("T.[D]]--]"));
849    }
850
851    #[test]
852    fn test_change_tracking_helper_sql() {
853        assert_eq!(
854            ChangeTracking::current_version_sql(),
855            "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
856        );
857
858        let min_sql = ChangeTracking::min_valid_version_sql("Products");
859        assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
860        assert!(min_sql.contains("Products"));
861
862        let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
863        assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
864        assert!(mask_sql.contains("Price"));
865        assert!(mask_sql.contains("@mask"));
866    }
867
868    /// Issue #144: the helpers that embed names in `N'...'` literals must
869    /// double interior single quotes so a hostile name cannot terminate the
870    /// literal and smuggle SQL into the statement.
871    #[test]
872    fn test_nvarchar_literal_names_cannot_break_out() {
873        let hostile = "x'); SELECT 1--";
874
875        let sql = ChangeTracking::min_valid_version_sql(hostile);
876        assert!(
877            sql.contains("N'x''); SELECT 1--'"),
878            "single quotes must be doubled, got: {sql}"
879        );
880        assert!(!sql.contains("N'x');"), "literal must not end early: {sql}");
881
882        let sql = ChangeTracking::column_in_mask_sql(hostile, hostile, "@mask").unwrap();
883        assert!(sql.contains("N'x''); SELECT 1--'"));
884        assert!(!sql.contains("N'x');"));
885    }
886
887    /// Issue #144: `mask_variable` is spliced verbatim (no quoting can make
888    /// it safe), so it must be validated as an identifier.
889    #[test]
890    fn test_mask_variable_is_validated() {
891        assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask").is_ok());
892        assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask); DROP TABLE x--").is_err());
893        assert!(ChangeTracking::column_in_mask_sql("T", "C", "1 OR 1=1").is_err());
894        assert!(ChangeTracking::column_in_mask_sql("T", "C", "").is_err());
895    }
896
897    /// Issue #144: the database-level helpers bracket-quote the name and must
898    /// double interior `]` (QUOTENAME rule), matching the table helpers.
899    #[test]
900    fn test_database_name_brackets_are_escaped() {
901        let hostile = "x]; DROP DATABASE foo--";
902
903        let sql = ChangeTracking::enable_database_sql(hostile, 2, true);
904        assert!(
905            sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"),
906            "interior ] must be doubled, got: {sql}"
907        );
908        assert!(!sql.contains("ALTER DATABASE [x];"));
909
910        let sql = ChangeTracking::disable_database_sql(hostile);
911        assert!(sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"));
912        assert!(!sql.contains("ALTER DATABASE [x];"));
913    }
914
915    #[test]
916    fn test_change_tracking_enable_sql() {
917        let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
918        assert!(db_sql.contains("[MyDB]"));
919        assert!(db_sql.contains("7 DAYS"));
920        assert!(db_sql.contains("AUTO_CLEANUP = ON"));
921
922        let table_sql = ChangeTracking::enable_table_sql("Products", true);
923        assert!(table_sql.contains("[Products]"));
924        assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
925
926        // Schema-qualified names must be bracketed part by part —
927        // `[dbo.Products]` is a single (nonexistent) identifier and fails
928        // with error 4902 on a real server.
929        let qualified = ChangeTracking::enable_table_sql("dbo.Products", true);
930        assert!(qualified.contains("ALTER TABLE [dbo].[Products]"));
931        let disable = ChangeTracking::disable_table_sql("dbo.Products");
932        assert!(disable.contains("ALTER TABLE [dbo].[Products]"));
933    }
934
935    /// PR #143 review, Blocker 2: interior `]` must be doubled (QUOTENAME
936    /// rule) so a hostile table name cannot terminate the bracketed
937    /// identifier early and smuggle a second statement into the batch.
938    #[test]
939    fn test_bracket_escapes_closing_brackets() {
940        let sql = ChangeTracking::enable_table_sql("foo]; DROP TABLE bar--", true);
941        assert!(
942            sql.contains("ALTER TABLE [foo]]; DROP TABLE bar--]"),
943            "interior ] must be doubled, got: {sql}"
944        );
945        assert!(
946            !sql.contains("ALTER TABLE [foo];"),
947            "the identifier must not be terminated early: {sql}"
948        );
949
950        let sql = ChangeTracking::disable_table_sql("we]ird.na]me");
951        assert!(sql.contains("ALTER TABLE [we]]ird].[na]]me]"));
952    }
953
954    #[test]
955    fn test_sync_version_status() {
956        // Valid case
957        assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
958        assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
959
960        // Too old case
961        assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
962
963        // Not enabled case
964        let status = SyncVersionStatus::check(100, None);
965        assert_eq!(status, SyncVersionStatus::NotEnabled);
966        assert!(!status.can_sync_incrementally());
967    }
968}