mssql_client/
change_tracking.rs

1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```rust,ignore
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//!     .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//!     .await?
22//!     .first()
23//!     .and_then(|r| r.try_get(0))
24//!     .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//!     .map(|row| ChangedRow::from_row(&row))
30//!     .collect();
31//!
32//! for change in changes {
33//!     match change.operation {
34//!         ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//!         ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//!         ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//!     }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//!     (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//!     WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77    /// A new row was inserted (I).
78    Insert,
79    /// An existing row was updated (U).
80    Update,
81    /// A row was deleted (D).
82    Delete,
83}
84
85impl ChangeOperation {
86    /// Parse a change operation from its single-character SQL Server representation.
87    ///
88    /// # Arguments
89    ///
90    /// * `s` - A string containing 'I', 'U', or 'D'
91    ///
92    /// # Returns
93    ///
94    /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95    #[must_use]
96    pub fn from_sql(s: &str) -> Option<Self> {
97        match s.trim().to_uppercase().as_str() {
98            "I" => Some(Self::Insert),
99            "U" => Some(Self::Update),
100            "D" => Some(Self::Delete),
101            _ => None,
102        }
103    }
104
105    /// Get the SQL Server single-character representation.
106    #[must_use]
107    pub const fn as_sql(&self) -> &'static str {
108        match self {
109            Self::Insert => "I",
110            Self::Update => "U",
111            Self::Delete => "D",
112        }
113    }
114
115    /// Check if this is an insert operation.
116    #[must_use]
117    pub const fn is_insert(&self) -> bool {
118        matches!(self, Self::Insert)
119    }
120
121    /// Check if this is an update operation.
122    #[must_use]
123    pub const fn is_update(&self) -> bool {
124        matches!(self, Self::Update)
125    }
126
127    /// Check if this is a delete operation.
128    #[must_use]
129    pub const fn is_delete(&self) -> bool {
130        matches!(self, Self::Delete)
131    }
132}
133
134impl fmt::Display for ChangeOperation {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::Insert => write!(f, "INSERT"),
138            Self::Update => write!(f, "UPDATE"),
139            Self::Delete => write!(f, "DELETE"),
140        }
141    }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148pub struct ChangeMetadata {
149    /// The version when the row was last changed.
150    pub version: i64,
151    /// The version when the row was created (inserted).
152    /// This is `None` for delete operations.
153    pub creation_version: Option<i64>,
154    /// The type of change (Insert, Update, Delete).
155    pub operation: ChangeOperation,
156    /// Binary mask of changed columns.
157    /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
158    pub changed_columns: Option<Bytes>,
159    /// Application-defined change context.
160    pub context: Option<Bytes>,
161}
162
163impl ChangeMetadata {
164    /// Create new change metadata.
165    #[must_use]
166    pub fn new(
167        version: i64,
168        creation_version: Option<i64>,
169        operation: ChangeOperation,
170        changed_columns: Option<Bytes>,
171        context: Option<Bytes>,
172    ) -> Self {
173        Self {
174            version,
175            creation_version,
176            operation,
177            changed_columns,
178            context,
179        }
180    }
181
182    /// Create metadata for an insert operation.
183    #[must_use]
184    pub fn insert(version: i64) -> Self {
185        Self {
186            version,
187            creation_version: Some(version),
188            operation: ChangeOperation::Insert,
189            changed_columns: None,
190            context: None,
191        }
192    }
193
194    /// Create metadata for an update operation.
195    #[must_use]
196    pub fn update(version: i64, creation_version: i64) -> Self {
197        Self {
198            version,
199            creation_version: Some(creation_version),
200            operation: ChangeOperation::Update,
201            changed_columns: None,
202            context: None,
203        }
204    }
205
206    /// Create metadata for a delete operation.
207    #[must_use]
208    pub fn delete(version: i64) -> Self {
209        Self {
210            version,
211            creation_version: None,
212            operation: ChangeOperation::Delete,
213            changed_columns: None,
214            context: None,
215        }
216    }
217}
218
219/// Query builder for Change Tracking operations.
220///
221/// Helps construct proper SQL queries for common Change Tracking patterns.
222///
223/// # Example
224///
225/// ```rust
226/// use mssql_client::change_tracking::ChangeTrackingQuery;
227///
228/// // Query for all changes since version 42
229/// let query = ChangeTrackingQuery::changes("Products", 42);
230/// assert!(query.to_sql().contains("CHANGETABLE"));
231///
232/// // Query with specific columns
233/// let query = ChangeTrackingQuery::changes("Orders", 100)
234///     .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
235/// let sql = query.to_sql();
236/// assert!(sql.contains("OrderId"));
237/// ```
238#[derive(Debug, Clone)]
239pub struct ChangeTrackingQuery {
240    table_name: String,
241    last_sync_version: i64,
242    columns: Option<Vec<String>>,
243    primary_keys: Option<Vec<String>>,
244    alias: String,
245    force_seek: bool,
246}
247
248impl ChangeTrackingQuery {
249    /// Create a query for changes to a table since a specific version.
250    ///
251    /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
252    ///
253    /// # Arguments
254    ///
255    /// * `table_name` - The name of the table to query changes for
256    /// * `last_sync_version` - The version from the previous sync (0 for initial)
257    ///
258    /// # Example
259    ///
260    /// ```rust
261    /// use mssql_client::change_tracking::ChangeTrackingQuery;
262    ///
263    /// let query = ChangeTrackingQuery::changes("Products", 42);
264    /// ```
265    #[must_use]
266    pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
267        Self {
268            table_name: table_name.into(),
269            last_sync_version,
270            columns: None,
271            primary_keys: None,
272            alias: "CT".into(),
273            force_seek: false,
274        }
275    }
276
277    /// Specify which data columns to include (in addition to change tracking columns).
278    ///
279    /// If not specified, only change tracking system columns are returned.
280    ///
281    /// # Arguments
282    ///
283    /// * `columns` - Column names to include in the result
284    #[must_use]
285    pub fn with_columns(mut self, columns: &[&str]) -> Self {
286        self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
287        self
288    }
289
290    /// Specify the primary key columns for the table.
291    ///
292    /// This is needed when you want to join change tracking results
293    /// with the original table to get current row data.
294    #[must_use]
295    pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
296        self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
297        self
298    }
299
300    /// Set the table alias for the CHANGETABLE result.
301    #[must_use]
302    pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
303        self.alias = alias.into();
304        self
305    }
306
307    /// Enable FORCESEEK hint for the query.
308    ///
309    /// This can improve performance in some scenarios.
310    #[must_use]
311    pub fn with_force_seek(mut self) -> Self {
312        self.force_seek = true;
313        self
314    }
315
316    /// Generate the SQL query string.
317    ///
318    /// This returns a query that can be executed directly.
319    #[must_use]
320    pub fn to_sql(&self) -> String {
321        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
322
323        // Build the SELECT column list
324        let select_cols = self.build_select_columns();
325
326        format!(
327            "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{})",
328            select_cols, self.table_name, self.last_sync_version, force_seek
329        )
330    }
331
332    /// Generate a SQL query that joins with the original table.
333    ///
334    /// This is useful when you need both the change tracking metadata
335    /// and the current row data (for inserts and updates).
336    ///
337    /// # Arguments
338    ///
339    /// * `data_columns` - Columns from the data table to include
340    ///
341    /// # Example
342    ///
343    /// ```rust
344    /// use mssql_client::change_tracking::ChangeTrackingQuery;
345    ///
346    /// let query = ChangeTrackingQuery::changes("Products", 42)
347    ///     .with_primary_keys(&["ProductId"]);
348    /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
349    /// assert!(sql.contains("LEFT OUTER JOIN"));
350    /// ```
351    #[must_use]
352    pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
353        let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
354        let alias = &self.alias;
355
356        // Build change tracking columns
357        let ct_cols = format!(
358            "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
359             {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
360        );
361
362        // Build data columns (prefixed with table alias)
363        let data_cols: String = data_columns
364            .iter()
365            .map(|c| format!("T.{c}"))
366            .collect::<Vec<_>>()
367            .join(", ");
368
369        // Build primary key columns from change tracking
370        let pk_cols: String = self
371            .primary_keys
372            .as_ref()
373            .map(|pks| {
374                pks.iter()
375                    .map(|pk| format!("{alias}.{pk}"))
376                    .collect::<Vec<_>>()
377                    .join(", ")
378            })
379            .unwrap_or_default();
380
381        // Build join condition
382        let join_condition: String = self
383            .primary_keys
384            .as_ref()
385            .map(|pks| {
386                pks.iter()
387                    .map(|pk| format!("{alias}.{pk} = T.{pk}"))
388                    .collect::<Vec<_>>()
389                    .join(" AND ")
390            })
391            .unwrap_or_else(|| "1=1".into());
392
393        let select_cols = if pk_cols.is_empty() {
394            format!("{ct_cols}, {data_cols}")
395        } else {
396            format!("{ct_cols}, {pk_cols}, {data_cols}")
397        };
398
399        format!(
400            "SELECT {select_cols} \
401             FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
402             LEFT OUTER JOIN {table} AS T ON {join_condition}",
403            table = self.table_name,
404            version = self.last_sync_version,
405        )
406    }
407
408    fn build_select_columns(&self) -> String {
409        let alias = &self.alias;
410
411        // Always include change tracking system columns
412        let mut cols = vec![
413            format!("{alias}.SYS_CHANGE_VERSION"),
414            format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
415            format!("{alias}.SYS_CHANGE_OPERATION"),
416            format!("{alias}.SYS_CHANGE_COLUMNS"),
417            format!("{alias}.SYS_CHANGE_CONTEXT"),
418        ];
419
420        // Add primary key columns if specified
421        if let Some(ref pks) = self.primary_keys {
422            for pk in pks {
423                cols.push(format!("{alias}.{pk}"));
424            }
425        }
426
427        // Add data columns if specified
428        if let Some(ref data_cols) = self.columns {
429            for col in data_cols {
430                cols.push(format!("{alias}.{col}"));
431            }
432        }
433
434        cols.join(", ")
435    }
436}
437
438/// Helper functions for Change Tracking operations.
439pub struct ChangeTracking;
440
441impl ChangeTracking {
442    /// Generate SQL to get the current change tracking version.
443    ///
444    /// Returns the global change tracking version number.
445    ///
446    /// # Example
447    ///
448    /// ```rust
449    /// use mssql_client::change_tracking::ChangeTracking;
450    ///
451    /// let sql = ChangeTracking::current_version_sql();
452    /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
453    /// ```
454    #[must_use]
455    pub const fn current_version_sql() -> &'static str {
456        "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
457    }
458
459    /// Generate SQL to get the minimum valid version for a table.
460    ///
461    /// If a client's last sync version is less than this, it must
462    /// perform a full re-sync instead of incremental sync.
463    ///
464    /// # Arguments
465    ///
466    /// * `table_name` - The name of the table
467    ///
468    /// # Example
469    ///
470    /// ```rust
471    /// use mssql_client::change_tracking::ChangeTracking;
472    ///
473    /// let sql = ChangeTracking::min_valid_version_sql("Products");
474    /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
475    /// ```
476    #[must_use]
477    pub fn min_valid_version_sql(table_name: &str) -> String {
478        format!("SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{table_name}'))")
479    }
480
481    /// Generate SQL to check if a column is in a change mask.
482    ///
483    /// Used to determine which specific columns changed in an update operation.
484    ///
485    /// # Arguments
486    ///
487    /// * `table_name` - The table name
488    /// * `column_name` - The column to check
489    /// * `mask_variable` - The name of the variable holding the change mask
490    ///
491    /// # Example
492    ///
493    /// ```rust
494    /// use mssql_client::change_tracking::ChangeTracking;
495    ///
496    /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask");
497    /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
498    /// ```
499    #[must_use]
500    pub fn column_in_mask_sql(table_name: &str, column_name: &str, mask_variable: &str) -> String {
501        format!(
502            "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
503             COLUMNPROPERTY(OBJECT_ID(N'{table_name}'), N'{column_name}', 'ColumnId'), \
504             {mask_variable})"
505        )
506    }
507
508    /// Generate SQL to enable change tracking on a database.
509    ///
510    /// # Arguments
511    ///
512    /// * `database_name` - The database name
513    /// * `retention_days` - How long to retain change data
514    /// * `auto_cleanup` - Whether to automatically clean up old data
515    ///
516    /// # Example
517    ///
518    /// ```rust
519    /// use mssql_client::change_tracking::ChangeTracking;
520    ///
521    /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
522    /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
523    /// ```
524    #[must_use]
525    pub fn enable_database_sql(
526        database_name: &str,
527        retention_days: u32,
528        auto_cleanup: bool,
529    ) -> String {
530        let cleanup = if auto_cleanup { "ON" } else { "OFF" };
531        format!(
532            "ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = ON \
533             (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})"
534        )
535    }
536
537    /// Generate SQL to enable change tracking on a table.
538    ///
539    /// # Arguments
540    ///
541    /// * `table_name` - The table name
542    /// * `track_columns_updated` - Whether to track which columns were updated
543    ///
544    /// # Example
545    ///
546    /// ```rust
547    /// use mssql_client::change_tracking::ChangeTracking;
548    ///
549    /// let sql = ChangeTracking::enable_table_sql("Products", true);
550    /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
551    /// ```
552    #[must_use]
553    pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
554        let track_cols = if track_columns_updated { "ON" } else { "OFF" };
555        format!(
556            "ALTER TABLE [{table_name}] ENABLE CHANGE_TRACKING \
557             WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
558        )
559    }
560
561    /// Generate SQL to disable change tracking on a table.
562    #[must_use]
563    pub fn disable_table_sql(table_name: &str) -> String {
564        format!("ALTER TABLE [{table_name}] DISABLE CHANGE_TRACKING")
565    }
566
567    /// Generate SQL to disable change tracking on a database.
568    #[must_use]
569    pub fn disable_database_sql(database_name: &str) -> String {
570        format!("ALTER DATABASE [{database_name}] SET CHANGE_TRACKING = OFF")
571    }
572}
573
574/// Result of checking if a sync version is still valid.
575#[derive(Debug, Clone, Copy, PartialEq, Eq)]
576pub enum SyncVersionStatus {
577    /// The sync version is valid and incremental sync can proceed.
578    Valid,
579    /// The sync version is too old; a full re-sync is required.
580    TooOld,
581    /// Change tracking is not enabled or the table doesn't exist.
582    NotEnabled,
583}
584
585impl SyncVersionStatus {
586    /// Check sync version validity from the min_valid_version result.
587    ///
588    /// # Arguments
589    ///
590    /// * `last_sync_version` - The client's last synchronized version
591    /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
592    ///
593    /// # Returns
594    ///
595    /// The sync status indicating whether incremental sync is possible.
596    #[must_use]
597    pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
598        match min_valid_version {
599            None => Self::NotEnabled,
600            Some(min) if last_sync_version >= min => Self::Valid,
601            Some(_) => Self::TooOld,
602        }
603    }
604
605    /// Check if incremental sync is possible.
606    #[must_use]
607    pub const fn can_sync_incrementally(&self) -> bool {
608        matches!(self, Self::Valid)
609    }
610
611    /// Check if a full re-sync is required.
612    #[must_use]
613    pub const fn requires_full_sync(&self) -> bool {
614        matches!(self, Self::TooOld)
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    #[test]
623    fn test_change_operation_from_sql() {
624        assert_eq!(
625            ChangeOperation::from_sql("I"),
626            Some(ChangeOperation::Insert)
627        );
628        assert_eq!(
629            ChangeOperation::from_sql("U"),
630            Some(ChangeOperation::Update)
631        );
632        assert_eq!(
633            ChangeOperation::from_sql("D"),
634            Some(ChangeOperation::Delete)
635        );
636        assert_eq!(
637            ChangeOperation::from_sql("i"),
638            Some(ChangeOperation::Insert)
639        );
640        assert_eq!(
641            ChangeOperation::from_sql(" U "),
642            Some(ChangeOperation::Update)
643        );
644        assert_eq!(ChangeOperation::from_sql("X"), None);
645        assert_eq!(ChangeOperation::from_sql(""), None);
646    }
647
648    #[test]
649    fn test_change_operation_as_sql() {
650        assert_eq!(ChangeOperation::Insert.as_sql(), "I");
651        assert_eq!(ChangeOperation::Update.as_sql(), "U");
652        assert_eq!(ChangeOperation::Delete.as_sql(), "D");
653    }
654
655    #[test]
656    fn test_change_operation_predicates() {
657        assert!(ChangeOperation::Insert.is_insert());
658        assert!(!ChangeOperation::Insert.is_update());
659        assert!(!ChangeOperation::Insert.is_delete());
660
661        assert!(!ChangeOperation::Update.is_insert());
662        assert!(ChangeOperation::Update.is_update());
663        assert!(!ChangeOperation::Update.is_delete());
664
665        assert!(!ChangeOperation::Delete.is_insert());
666        assert!(!ChangeOperation::Delete.is_update());
667        assert!(ChangeOperation::Delete.is_delete());
668    }
669
670    #[test]
671    fn test_change_metadata_constructors() {
672        let insert = ChangeMetadata::insert(42);
673        assert_eq!(insert.version, 42);
674        assert_eq!(insert.creation_version, Some(42));
675        assert_eq!(insert.operation, ChangeOperation::Insert);
676
677        let update = ChangeMetadata::update(50, 42);
678        assert_eq!(update.version, 50);
679        assert_eq!(update.creation_version, Some(42));
680        assert_eq!(update.operation, ChangeOperation::Update);
681
682        let delete = ChangeMetadata::delete(60);
683        assert_eq!(delete.version, 60);
684        assert_eq!(delete.creation_version, None);
685        assert_eq!(delete.operation, ChangeOperation::Delete);
686    }
687
688    #[test]
689    fn test_change_tracking_query_basic() {
690        let query = ChangeTrackingQuery::changes("Products", 42);
691        let sql = query.to_sql();
692
693        assert!(sql.contains("CHANGETABLE(CHANGES Products, 42)"));
694        assert!(sql.contains("SYS_CHANGE_VERSION"));
695        assert!(sql.contains("SYS_CHANGE_OPERATION"));
696    }
697
698    #[test]
699    fn test_change_tracking_query_with_columns() {
700        let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
701        let sql = query.to_sql();
702
703        assert!(sql.contains("CT.Name"));
704        assert!(sql.contains("CT.Price"));
705    }
706
707    #[test]
708    fn test_change_tracking_query_with_primary_keys() {
709        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
710        let sql = query.to_sql();
711
712        assert!(sql.contains("CT.ProductId"));
713    }
714
715    #[test]
716    fn test_change_tracking_query_force_seek() {
717        let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
718        let sql = query.to_sql();
719
720        assert!(sql.contains("FORCESEEK"));
721    }
722
723    #[test]
724    fn test_change_tracking_query_with_data() {
725        let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
726        let sql = query.to_sql_with_data(&["Name", "Price"]);
727
728        assert!(sql.contains("LEFT OUTER JOIN Products AS T"));
729        assert!(sql.contains("CT.ProductId = T.ProductId"));
730        assert!(sql.contains("T.Name"));
731        assert!(sql.contains("T.Price"));
732    }
733
734    #[test]
735    fn test_change_tracking_helper_sql() {
736        assert_eq!(
737            ChangeTracking::current_version_sql(),
738            "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
739        );
740
741        let min_sql = ChangeTracking::min_valid_version_sql("Products");
742        assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
743        assert!(min_sql.contains("Products"));
744
745        let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask");
746        assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
747        assert!(mask_sql.contains("Price"));
748        assert!(mask_sql.contains("@mask"));
749    }
750
751    #[test]
752    fn test_change_tracking_enable_sql() {
753        let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
754        assert!(db_sql.contains("[MyDB]"));
755        assert!(db_sql.contains("7 DAYS"));
756        assert!(db_sql.contains("AUTO_CLEANUP = ON"));
757
758        let table_sql = ChangeTracking::enable_table_sql("Products", true);
759        assert!(table_sql.contains("[Products]"));
760        assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
761    }
762
763    #[test]
764    fn test_sync_version_status() {
765        // Valid case
766        assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
767        assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
768
769        // Too old case
770        assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
771
772        // Not enabled case
773        let status = SyncVersionStatus::check(100, None);
774        assert_eq!(status, SyncVersionStatus::NotEnabled);
775        assert!(!status.can_sync_incrementally());
776    }
777}