Skip to main content

mssql_client/
change_tracking.rs

1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```rust,ignore
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//!     .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//!     .await?
22//!     .first()
23//!     .and_then(|r| r.try_get(0))
24//!     .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//!     .map(|row| ChangedRow::from_row(&row))
30//!     .collect();
31//!
32//! for change in changes {
33//!     match change.operation {
34//!         ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//!         ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//!         ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//!     }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//!     (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//!     WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77    /// A new row was inserted (I).
78    Insert,
79    /// An existing row was updated (U).
80    Update,
81    /// A row was deleted (D).
82    Delete,
83}
84
85impl ChangeOperation {
86    /// Parse a change operation from its single-character SQL Server representation.
87    ///
88    /// # Arguments
89    ///
90    /// * `s` - A string containing 'I', 'U', or 'D'
91    ///
92    /// # Returns
93    ///
94    /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95    #[must_use]
96    pub fn from_sql(s: &str) -> Option<Self> {
97        match s.trim().to_uppercase().as_str() {
98            "I" => Some(Self::Insert),
99            "U" => Some(Self::Update),
100            "D" => Some(Self::Delete),
101            _ => None,
102        }
103    }
104
105    /// Get the SQL Server single-character representation.
106    #[must_use]
107    pub const fn as_sql(&self) -> &'static str {
108        match self {
109            Self::Insert => "I",
110            Self::Update => "U",
111            Self::Delete => "D",
112        }
113    }
114
115    /// Check if this is an insert operation.
116    #[must_use]
117    pub const fn is_insert(&self) -> bool {
118        matches!(self, Self::Insert)
119    }
120
121    /// Check if this is an update operation.
122    #[must_use]
123    pub const fn is_update(&self) -> bool {
124        matches!(self, Self::Update)
125    }
126
127    /// Check if this is a delete operation.
128    #[must_use]
129    pub const fn is_delete(&self) -> bool {
130        matches!(self, Self::Delete)
131    }
132}
133
134impl fmt::Display for ChangeOperation {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::Insert => write!(f, "INSERT"),
138            Self::Update => write!(f, "UPDATE"),
139            Self::Delete => write!(f, "DELETE"),
140        }
141    }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub struct ChangeMetadata {
150    /// The version when the row was last changed.
151    pub version: i64,
152    /// The version when the row was created (inserted).
153    /// This is `None` for delete operations.
154    pub creation_version: Option<i64>,
155    /// The type of change (Insert, Update, Delete).
156    pub operation: ChangeOperation,
157    /// Binary mask of changed columns.
158    /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
159    pub changed_columns: Option<Bytes>,
160    /// Application-defined change context.
161    pub context: Option<Bytes>,
162}
163
164impl ChangeMetadata {
165    /// Create new change metadata.
166    #[must_use]
167    pub fn new(
168        version: i64,
169        creation_version: Option<i64>,
170        operation: ChangeOperation,
171        changed_columns: Option<Bytes>,
172        context: Option<Bytes>,
173    ) -> Self {
174        Self {
175            version,
176            creation_version,
177            operation,
178            changed_columns,
179            context,
180        }
181    }
182
183    /// Create metadata for an insert operation.
184    #[must_use]
185    pub fn insert(version: i64) -> Self {
186        Self {
187            version,
188            creation_version: Some(version),
189            operation: ChangeOperation::Insert,
190            changed_columns: None,
191            context: None,
192        }
193    }
194
195    /// Create metadata for an update operation.
196    #[must_use]
197    pub fn update(version: i64, creation_version: i64) -> Self {
198        Self {
199            version,
200            creation_version: Some(creation_version),
201            operation: ChangeOperation::Update,
202            changed_columns: None,
203            context: None,
204        }
205    }
206
207    /// Create metadata for a delete operation.
208    #[must_use]
209    pub fn delete(version: i64) -> Self {
210        Self {
211            version,
212            creation_version: None,
213            operation: ChangeOperation::Delete,
214            changed_columns: None,
215            context: None,
216        }
217    }
218}
219
220/// Query builder for Change Tracking operations.
221///
222/// Helps construct proper SQL queries for common Change Tracking patterns.
223///
224/// # Example
225///
226/// ```rust
227/// use mssql_client::change_tracking::ChangeTrackingQuery;
228///
229/// // Query for all changes since version 42
230/// let query = ChangeTrackingQuery::changes("Products", 42);
231/// assert!(query.to_sql().contains("CHANGETABLE"));
232///
233/// // Query with specific columns
234/// let query = ChangeTrackingQuery::changes("Orders", 100)
235///     .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
236/// let sql = query.to_sql();
237/// assert!(sql.contains("OrderId"));
238/// ```
239#[derive(Debug, Clone)]
240pub struct ChangeTrackingQuery {
241    table_name: String,
242    last_sync_version: i64,
243    columns: Option<Vec<String>>,
244    primary_keys: Option<Vec<String>>,
245    alias: String,
246    force_seek: bool,
247}
248
249impl ChangeTrackingQuery {
250    /// Create a query for changes to a table since a specific version.
251    ///
252    /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
253    ///
254    /// # Arguments
255    ///
256    /// * `table_name` - The name of the table to query changes for
257    /// * `last_sync_version` - The version from the previous sync (0 for initial)
258    ///
259    /// # Example
260    ///
261    /// ```rust
262    /// use mssql_client::change_tracking::ChangeTrackingQuery;
263    ///
264    /// let query = ChangeTrackingQuery::changes("Products", 42);
265    /// ```
266    #[must_use]
267    pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
268        Self {
269            table_name: table_name.into(),
270            last_sync_version,
271            columns: None,
272            primary_keys: None,
273            alias: "CT".into(),
274            force_seek: false,
275        }
276    }
277
278    /// Specify which data columns to include (in addition to change tracking columns).
279    ///
280    /// If not specified, only change tracking system columns are returned.
281    ///
282    /// # Arguments
283    ///
284    /// * `columns` - Column names to include in the result
285    #[must_use]
286    pub fn with_columns(mut self, columns: &[&str]) -> Self {
287        self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
288        self
289    }
290
291    /// Specify the primary key columns for the table.
292    ///
293    /// This is needed when you want to join change tracking results
294    /// with the original table to get current row data.
295    #[must_use]
296    pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
297        self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
298        self
299    }
300
301    /// Set the table alias for the CHANGETABLE result.
302    #[must_use]
303    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
304        self.alias = alias.into();
305        self
306    }
307
308    /// Enable FORCESEEK hint for the query.
309    ///
310    /// This can improve performance in some scenarios.
311    #[must_use]
312    pub fn with_force_seek(mut self) -> Self {
313        self.force_seek = true;
314        self
315    }
316
317    /// Generate the SQL query string.
318    ///
319    /// This returns a query that can be executed directly.
320    #[must_use]
321    pub fn to_sql(&self) -> String {
322        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
323
324        // Build the SELECT column list
325        let select_cols = self.build_select_columns();
326
327        format!(
328            "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{})",
329            select_cols, self.table_name, self.last_sync_version, force_seek
330        )
331    }
332
333    /// Generate a SQL query that joins with the original table.
334    ///
335    /// This is useful when you need both the change tracking metadata
336    /// and the current row data (for inserts and updates).
337    ///
338    /// # Arguments
339    ///
340    /// * `data_columns` - Columns from the data table to include
341    ///
342    /// # Example
343    ///
344    /// ```rust
345    /// use mssql_client::change_tracking::ChangeTrackingQuery;
346    ///
347    /// let query = ChangeTrackingQuery::changes("Products", 42)
348    ///     .with_primary_keys(&["ProductId"]);
349    /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
350    /// assert!(sql.contains("LEFT OUTER JOIN"));
351    /// ```
352    #[must_use]
353    pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
354        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
355        let alias = &self.alias;
356
357        // Build change tracking columns
358        let ct_cols = format!(
359            "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
360             {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
361        );
362
363        // Build data columns (prefixed with table alias)
364        let data_cols: String = data_columns
365            .iter()
366            .map(|c| format!("T.{c}"))
367            .collect::<Vec<_>>()
368            .join(", ");
369
370        // Build primary key columns from change tracking
371        let pk_cols: String = self
372            .primary_keys
373            .as_ref()
374            .map(|pks| {
375                pks.iter()
376                    .map(|pk| format!("{alias}.{pk}"))
377                    .collect::<Vec<_>>()
378                    .join(", ")
379            })
380            .unwrap_or_default();
381
382        // Build join condition
383        let join_condition: String = self
384            .primary_keys
385            .as_ref()
386            .map(|pks| {
387                pks.iter()
388                    .map(|pk| format!("{alias}.{pk} = T.{pk}"))
389                    .collect::<Vec<_>>()
390                    .join(" AND ")
391            })
392            .unwrap_or_else(|| "1=1".into());
393
394        let select_cols = if pk_cols.is_empty() {
395            format!("{ct_cols}, {data_cols}")
396        } else {
397            format!("{ct_cols}, {pk_cols}, {data_cols}")
398        };
399
400        format!(
401            "SELECT {select_cols} \
402             FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
403             LEFT OUTER JOIN {table} AS T ON {join_condition}",
404            table = self.table_name,
405            version = self.last_sync_version,
406        )
407    }
408
409    fn build_select_columns(&self) -> String {
410        let alias = &self.alias;
411
412        // Always include change tracking system columns
413        let mut cols = vec![
414            format!("{alias}.SYS_CHANGE_VERSION"),
415            format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
416            format!("{alias}.SYS_CHANGE_OPERATION"),
417            format!("{alias}.SYS_CHANGE_COLUMNS"),
418            format!("{alias}.SYS_CHANGE_CONTEXT"),
419        ];
420
421        // Add primary key columns if specified
422        if let Some(ref pks) = self.primary_keys {
423            for pk in pks {
424                cols.push(format!("{alias}.{pk}"));
425            }
426        }
427
428        // Add data columns if specified
429        if let Some(ref data_cols) = self.columns {
430            for col in data_cols {
431                cols.push(format!("{alias}.{col}"));
432            }
433        }
434
435        cols.join(", ")
436    }
437}
438
439/// Helper functions for Change Tracking operations.
440pub struct ChangeTracking;
441
442impl ChangeTracking {
443    /// Generate SQL to get the current change tracking version.
444    ///
445    /// Returns the global change tracking version number.
446    ///
447    /// # Example
448    ///
449    /// ```rust
450    /// use mssql_client::change_tracking::ChangeTracking;
451    ///
452    /// let sql = ChangeTracking::current_version_sql();
453    /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
454    /// ```
455    #[must_use]
456    pub const fn current_version_sql() -> &'static str {
457        "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
458    }
459
460    /// Generate SQL to get the minimum valid version for a table.
461    ///
462    /// If a client's last sync version is less than this, it must
463    /// perform a full re-sync instead of incremental sync.
464    ///
465    /// # Arguments
466    ///
467    /// * `table_name` - The name of the table
468    ///
469    /// # Example
470    ///
471    /// ```rust
472    /// use mssql_client::change_tracking::ChangeTracking;
473    ///
474    /// let sql = ChangeTracking::min_valid_version_sql("Products");
475    /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
476    /// ```
477    #[must_use]
478    pub fn min_valid_version_sql(table_name: &str) -> String {
479        format!("SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{table_name}'))")
480    }
481
482    /// Generate SQL to check if a column is in a change mask.
483    ///
484    /// Used to determine which specific columns changed in an update operation.
485    ///
486    /// # Arguments
487    ///
488    /// * `table_name` - The table name
489    /// * `column_name` - The column to check
490    /// * `mask_variable` - The name of the variable holding the change mask
491    ///
492    /// # Example
493    ///
494    /// ```rust
495    /// use mssql_client::change_tracking::ChangeTracking;
496    ///
497    /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask");
498    /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
499    /// ```
500    #[must_use]
501    pub fn column_in_mask_sql(table_name: &str, column_name: &str, mask_variable: &str) -> String {
502        format!(
503            "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
504             COLUMNPROPERTY(OBJECT_ID(N'{table_name}'), N'{column_name}', 'ColumnId'), \
505             {mask_variable})"
506        )
507    }
508
509    /// Generate SQL to enable change tracking on a database.
510    ///
511    /// # Arguments
512    ///
513    /// * `database_name` - The database name
514    /// * `retention_days` - How long to retain change data
515    /// * `auto_cleanup` - Whether to automatically clean up old data
516    ///
517    /// # Example
518    ///
519    /// ```rust
520    /// use mssql_client::change_tracking::ChangeTracking;
521    ///
522    /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
523    /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
524    /// ```
525    #[must_use]
526    pub fn enable_database_sql(
527        database_name: &str,
528        retention_days: u32,
529        auto_cleanup: bool,
530    ) -> String {
531        let cleanup = if auto_cleanup { "ON" } else { "OFF" };
532        format!(
533            "ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = ON \
534             (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})"
535        )
536    }
537
538    /// Generate SQL to enable change tracking on a table.
539    ///
540    /// # Arguments
541    ///
542    /// * `table_name` - The table name
543    /// * `track_columns_updated` - Whether to track which columns were updated
544    ///
545    /// # Example
546    ///
547    /// ```rust
548    /// use mssql_client::change_tracking::ChangeTracking;
549    ///
550    /// let sql = ChangeTracking::enable_table_sql("Products", true);
551    /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
552    /// ```
553    #[must_use]
554    pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
555        let track_cols = if track_columns_updated { "ON" } else { "OFF" };
556        format!(
557            "ALTER TABLE [{table_name}] ENABLE CHANGE_TRACKING \
558             WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
559        )
560    }
561
562    /// Generate SQL to disable change tracking on a table.
563    #[must_use]
564    pub fn disable_table_sql(table_name: &str) -> String {
565        format!("ALTER TABLE [{table_name}] DISABLE CHANGE_TRACKING")
566    }
567
568    /// Generate SQL to disable change tracking on a database.
569    #[must_use]
570    pub fn disable_database_sql(database_name: &str) -> String {
571        format!("ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = OFF")
572    }
573}
574
575/// Result of checking if a sync version is still valid.
576#[derive(Debug, Clone, Copy, PartialEq, Eq)]
577#[non_exhaustive]
578pub enum SyncVersionStatus {
579    /// The sync version is valid and incremental sync can proceed.
580    Valid,
581    /// The sync version is too old; a full re-sync is required.
582    TooOld,
583    /// Change tracking is not enabled or the table doesn't exist.
584    NotEnabled,
585}
586
587impl SyncVersionStatus {
588    /// Check sync version validity from the min_valid_version result.
589    ///
590    /// # Arguments
591    ///
592    /// * `last_sync_version` - The client's last synchronized version
593    /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
594    ///
595    /// # Returns
596    ///
597    /// The sync status indicating whether incremental sync is possible.
598    #[must_use]
599    pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
600        match min_valid_version {
601            None => Self::NotEnabled,
602            Some(min) if last_sync_version >= min => Self::Valid,
603            Some(_) => Self::TooOld,
604        }
605    }
606
607    /// Check if incremental sync is possible.
608    #[must_use]
609    pub const fn can_sync_incrementally(&self) -> bool {
610        matches!(self, Self::Valid)
611    }
612
613    /// Check if a full re-sync is required.
614    #[must_use]
615    pub const fn requires_full_sync(&self) -> bool {
616        matches!(self, Self::TooOld)
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623
624    #[test]
625    fn test_change_operation_from_sql() {
626        assert_eq!(
627            ChangeOperation::from_sql("I"),
628            Some(ChangeOperation::Insert)
629        );
630        assert_eq!(
631            ChangeOperation::from_sql("U"),
632            Some(ChangeOperation::Update)
633        );
634        assert_eq!(
635            ChangeOperation::from_sql("D"),
636            Some(ChangeOperation::Delete)
637        );
638        assert_eq!(
639            ChangeOperation::from_sql("i"),
640            Some(ChangeOperation::Insert)
641        );
642        assert_eq!(
643            ChangeOperation::from_sql(" U "),
644            Some(ChangeOperation::Update)
645        );
646        assert_eq!(ChangeOperation::from_sql("X"), None);
647        assert_eq!(ChangeOperation::from_sql(""), None);
648    }
649
650    #[test]
651    fn test_change_operation_as_sql() {
652        assert_eq!(ChangeOperation::Insert.as_sql(), "I");
653        assert_eq!(ChangeOperation::Update.as_sql(), "U");
654        assert_eq!(ChangeOperation::Delete.as_sql(), "D");
655    }
656
657    #[test]
658    fn test_change_operation_predicates() {
659        assert!(ChangeOperation::Insert.is_insert());
660        assert!(!ChangeOperation::Insert.is_update());
661        assert!(!ChangeOperation::Insert.is_delete());
662
663        assert!(!ChangeOperation::Update.is_insert());
664        assert!(ChangeOperation::Update.is_update());
665        assert!(!ChangeOperation::Update.is_delete());
666
667        assert!(!ChangeOperation::Delete.is_insert());
668        assert!(!ChangeOperation::Delete.is_update());
669        assert!(ChangeOperation::Delete.is_delete());
670    }
671
672    #[test]
673    fn test_change_metadata_constructors() {
674        let insert = ChangeMetadata::insert(42);
675        assert_eq!(insert.version, 42);
676        assert_eq!(insert.creation_version, Some(42));
677        assert_eq!(insert.operation, ChangeOperation::Insert);
678
679        let update = ChangeMetadata::update(50, 42);
680        assert_eq!(update.version, 50);
681        assert_eq!(update.creation_version, Some(42));
682        assert_eq!(update.operation, ChangeOperation::Update);
683
684        let delete = ChangeMetadata::delete(60);
685        assert_eq!(delete.version, 60);
686        assert_eq!(delete.creation_version, None);
687        assert_eq!(delete.operation, ChangeOperation::Delete);
688    }
689
690    #[test]
691    fn test_change_tracking_query_basic() {
692        let query = ChangeTrackingQuery::changes("Products", 42);
693        let sql = query.to_sql();
694
695        assert!(sql.contains("CHANGETABLE(CHANGES Products, 42)"));
696        assert!(sql.contains("SYS_CHANGE_VERSION"));
697        assert!(sql.contains("SYS_CHANGE_OPERATION"));
698    }
699
700    #[test]
701    fn test_change_tracking_query_with_columns() {
702        let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
703        let sql = query.to_sql();
704
705        assert!(sql.contains("CT.Name"));
706        assert!(sql.contains("CT.Price"));
707    }
708
709    #[test]
710    fn test_change_tracking_query_with_primary_keys() {
711        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
712        let sql = query.to_sql();
713
714        assert!(sql.contains("CT.ProductId"));
715    }
716
717    #[test]
718    fn test_change_tracking_query_force_seek() {
719        let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
720        let sql = query.to_sql();
721
722        assert!(sql.contains("FORCESEEK"));
723    }
724
725    #[test]
726    fn test_change_tracking_query_with_data() {
727        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
728        let sql = query.to_sql_with_data(&["Name", "Price"]);
729
730        assert!(sql.contains("LEFT OUTER JOIN Products AS T"));
731        assert!(sql.contains("CT.ProductId = T.ProductId"));
732        assert!(sql.contains("T.Name"));
733        assert!(sql.contains("T.Price"));
734    }
735
736    #[test]
737    fn test_change_tracking_helper_sql() {
738        assert_eq!(
739            ChangeTracking::current_version_sql(),
740            "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
741        );
742
743        let min_sql = ChangeTracking::min_valid_version_sql("Products");
744        assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
745        assert!(min_sql.contains("Products"));
746
747        let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask");
748        assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
749        assert!(mask_sql.contains("Price"));
750        assert!(mask_sql.contains("@mask"));
751    }
752
753    #[test]
754    fn test_change_tracking_enable_sql() {
755        let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
756        assert!(db_sql.contains("[MyDB]"));
757        assert!(db_sql.contains("7 DAYS"));
758        assert!(db_sql.contains("AUTO_CLEANUP = ON"));
759
760        let table_sql = ChangeTracking::enable_table_sql("Products", true);
761        assert!(table_sql.contains("[Products]"));
762        assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
763    }
764
765    #[test]
766    fn test_sync_version_status() {
767        // Valid case
768        assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
769        assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
770
771        // Too old case
772        assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
773
774        // Not enabled case
775        let status = SyncVersionStatus::check(100, None);
776        assert_eq!(status, SyncVersionStatus::NotEnabled);
777        assert!(!status.can_sync_incrementally());
778    }
779}