mssql_client/change_tracking.rs
1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```text
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//! .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//! .await?
22//! .first()
23//! .and_then(|r| r.try_get(0))
24//! .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//! .map(|row| ChangedRow::from_row(&row))
30//! .collect();
31//!
32//! for change in changes {
33//! match change.operation {
34//! ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//! ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//! ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//! }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//! (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//! WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77 /// A new row was inserted (I).
78 Insert,
79 /// An existing row was updated (U).
80 Update,
81 /// A row was deleted (D).
82 Delete,
83}
84
85impl ChangeOperation {
86 /// Parse a change operation from its single-character SQL Server representation.
87 ///
88 /// # Arguments
89 ///
90 /// * `s` - A string containing 'I', 'U', or 'D'
91 ///
92 /// # Returns
93 ///
94 /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95 #[must_use]
96 pub fn from_sql(s: &str) -> Option<Self> {
97 match s.trim().to_uppercase().as_str() {
98 "I" => Some(Self::Insert),
99 "U" => Some(Self::Update),
100 "D" => Some(Self::Delete),
101 _ => None,
102 }
103 }
104
105 /// Get the SQL Server single-character representation.
106 #[must_use]
107 pub const fn as_sql(&self) -> &'static str {
108 match self {
109 Self::Insert => "I",
110 Self::Update => "U",
111 Self::Delete => "D",
112 }
113 }
114
115 /// Check if this is an insert operation.
116 #[must_use]
117 pub const fn is_insert(&self) -> bool {
118 matches!(self, Self::Insert)
119 }
120
121 /// Check if this is an update operation.
122 #[must_use]
123 pub const fn is_update(&self) -> bool {
124 matches!(self, Self::Update)
125 }
126
127 /// Check if this is a delete operation.
128 #[must_use]
129 pub const fn is_delete(&self) -> bool {
130 matches!(self, Self::Delete)
131 }
132}
133
134impl fmt::Display for ChangeOperation {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 Self::Insert => write!(f, "INSERT"),
138 Self::Update => write!(f, "UPDATE"),
139 Self::Delete => write!(f, "DELETE"),
140 }
141 }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub struct ChangeMetadata {
150 /// The version when the row was last changed.
151 pub version: i64,
152 /// The version when the row was created (inserted).
153 /// This is `None` for delete operations.
154 pub creation_version: Option<i64>,
155 /// The type of change (Insert, Update, Delete).
156 pub operation: ChangeOperation,
157 /// Binary mask of changed columns.
158 /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
159 pub changed_columns: Option<Bytes>,
160 /// Application-defined change context.
161 pub context: Option<Bytes>,
162}
163
164impl ChangeMetadata {
165 /// Create new change metadata.
166 #[must_use]
167 pub fn new(
168 version: i64,
169 creation_version: Option<i64>,
170 operation: ChangeOperation,
171 changed_columns: Option<Bytes>,
172 context: Option<Bytes>,
173 ) -> Self {
174 Self {
175 version,
176 creation_version,
177 operation,
178 changed_columns,
179 context,
180 }
181 }
182
183 /// Create metadata for an insert operation.
184 #[must_use]
185 pub fn insert(version: i64) -> Self {
186 Self {
187 version,
188 creation_version: Some(version),
189 operation: ChangeOperation::Insert,
190 changed_columns: None,
191 context: None,
192 }
193 }
194
195 /// Create metadata for an update operation.
196 #[must_use]
197 pub fn update(version: i64, creation_version: i64) -> Self {
198 Self {
199 version,
200 creation_version: Some(creation_version),
201 operation: ChangeOperation::Update,
202 changed_columns: None,
203 context: None,
204 }
205 }
206
207 /// Create metadata for a delete operation.
208 #[must_use]
209 pub fn delete(version: i64) -> Self {
210 Self {
211 version,
212 creation_version: None,
213 operation: ChangeOperation::Delete,
214 changed_columns: None,
215 context: None,
216 }
217 }
218}
219
220/// Query builder for Change Tracking operations.
221///
222/// Helps construct proper SQL queries for common Change Tracking patterns.
223///
224/// # Example
225///
226/// ```rust
227/// use mssql_client::change_tracking::ChangeTrackingQuery;
228///
229/// // Query for all changes since version 42
230/// let query = ChangeTrackingQuery::changes("Products", 42);
231/// assert!(query.to_sql().contains("CHANGETABLE"));
232///
233/// // Query with specific columns
234/// let query = ChangeTrackingQuery::changes("Orders", 100)
235/// .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
236/// let sql = query.to_sql();
237/// assert!(sql.contains("OrderId"));
238/// ```
239#[derive(Debug, Clone)]
240pub struct ChangeTrackingQuery {
241 table_name: String,
242 last_sync_version: i64,
243 columns: Option<Vec<String>>,
244 primary_keys: Option<Vec<String>>,
245 alias: String,
246 force_seek: bool,
247}
248
249impl ChangeTrackingQuery {
250 /// Create a query for changes to a table since a specific version.
251 ///
252 /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
253 ///
254 /// # Arguments
255 ///
256 /// * `table_name` - The name of the table to query changes for
257 /// * `last_sync_version` - The version from the previous sync (0 for initial)
258 ///
259 /// # Example
260 ///
261 /// ```rust
262 /// use mssql_client::change_tracking::ChangeTrackingQuery;
263 ///
264 /// let query = ChangeTrackingQuery::changes("Products", 42);
265 /// ```
266 #[must_use]
267 pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
268 Self {
269 table_name: table_name.into(),
270 last_sync_version,
271 columns: None,
272 primary_keys: None,
273 alias: "CT".into(),
274 force_seek: false,
275 }
276 }
277
278 /// Specify which data columns to include (in addition to change tracking columns).
279 ///
280 /// If not specified, only change tracking system columns are returned.
281 ///
282 /// # Arguments
283 ///
284 /// * `columns` - Column names to include in the result
285 #[must_use]
286 pub fn with_columns(mut self, columns: &[&str]) -> Self {
287 self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
288 self
289 }
290
291 /// Specify the primary key columns for the table.
292 ///
293 /// This is needed when you want to join change tracking results
294 /// with the original table to get current row data.
295 #[must_use]
296 pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
297 self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
298 self
299 }
300
301 /// Set the table alias for the CHANGETABLE result.
302 #[must_use]
303 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
304 self.alias = alias.into();
305 self
306 }
307
308 /// Enable FORCESEEK hint for the query.
309 ///
310 /// This can improve performance in some scenarios.
311 #[must_use]
312 pub fn with_force_seek(mut self) -> Self {
313 self.force_seek = true;
314 self
315 }
316
317 /// Generate the SQL query string.
318 ///
319 /// This returns a query that can be executed directly.
320 #[must_use]
321 pub fn to_sql(&self) -> String {
322 let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
323
324 // Build the SELECT column list
325 let select_cols = self.build_select_columns();
326
327 // SQL Server requires an alias on CHANGETABLE ("A table returned by
328 // the CHANGETABLE function must be aliased.", error 22104).
329 format!(
330 "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{}) AS {}",
331 select_cols, self.table_name, self.last_sync_version, force_seek, self.alias
332 )
333 }
334
335 /// Generate a SQL query that joins with the original table.
336 ///
337 /// This is useful when you need both the change tracking metadata
338 /// and the current row data (for inserts and updates).
339 ///
340 /// # Arguments
341 ///
342 /// * `data_columns` - Columns from the data table to include
343 ///
344 /// # Example
345 ///
346 /// ```rust
347 /// use mssql_client::change_tracking::ChangeTrackingQuery;
348 ///
349 /// let query = ChangeTrackingQuery::changes("Products", 42)
350 /// .with_primary_keys(&["ProductId"]);
351 /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
352 /// assert!(sql.contains("LEFT OUTER JOIN"));
353 /// ```
354 #[must_use]
355 pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
356 let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
357 let alias = &self.alias;
358
359 // Build change tracking columns
360 let ct_cols = format!(
361 "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
362 {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
363 );
364
365 // Build data columns (prefixed with table alias)
366 let data_cols: String = data_columns
367 .iter()
368 .map(|c| format!("T.{c}"))
369 .collect::<Vec<_>>()
370 .join(", ");
371
372 // Build primary key columns from change tracking
373 let pk_cols: String = self
374 .primary_keys
375 .as_ref()
376 .map(|pks| {
377 pks.iter()
378 .map(|pk| format!("{alias}.{pk}"))
379 .collect::<Vec<_>>()
380 .join(", ")
381 })
382 .unwrap_or_default();
383
384 // Build join condition
385 let join_condition: String = self
386 .primary_keys
387 .as_ref()
388 .map(|pks| {
389 pks.iter()
390 .map(|pk| format!("{alias}.{pk} = T.{pk}"))
391 .collect::<Vec<_>>()
392 .join(" AND ")
393 })
394 .unwrap_or_else(|| "1=1".into());
395
396 let select_cols = if pk_cols.is_empty() {
397 format!("{ct_cols}, {data_cols}")
398 } else {
399 format!("{ct_cols}, {pk_cols}, {data_cols}")
400 };
401
402 format!(
403 "SELECT {select_cols} \
404 FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
405 LEFT OUTER JOIN {table} AS T ON {join_condition}",
406 table = self.table_name,
407 version = self.last_sync_version,
408 )
409 }
410
411 fn build_select_columns(&self) -> String {
412 let alias = &self.alias;
413
414 // Always include change tracking system columns
415 let mut cols = vec![
416 format!("{alias}.SYS_CHANGE_VERSION"),
417 format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
418 format!("{alias}.SYS_CHANGE_OPERATION"),
419 format!("{alias}.SYS_CHANGE_COLUMNS"),
420 format!("{alias}.SYS_CHANGE_CONTEXT"),
421 ];
422
423 // Add primary key columns if specified
424 if let Some(ref pks) = self.primary_keys {
425 for pk in pks {
426 cols.push(format!("{alias}.{pk}"));
427 }
428 }
429
430 // Add data columns if specified
431 if let Some(ref data_cols) = self.columns {
432 for col in data_cols {
433 cols.push(format!("{alias}.{col}"));
434 }
435 }
436
437 cols.join(", ")
438 }
439}
440
441/// Helper functions for Change Tracking operations.
442pub struct ChangeTracking;
443
444impl ChangeTracking {
445 /// Generate SQL to get the current change tracking version.
446 ///
447 /// Returns the global change tracking version number.
448 ///
449 /// # Example
450 ///
451 /// ```rust
452 /// use mssql_client::change_tracking::ChangeTracking;
453 ///
454 /// let sql = ChangeTracking::current_version_sql();
455 /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
456 /// ```
457 #[must_use]
458 pub const fn current_version_sql() -> &'static str {
459 "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
460 }
461
462 /// Generate SQL to get the minimum valid version for a table.
463 ///
464 /// If a client's last sync version is less than this, it must
465 /// perform a full re-sync instead of incremental sync.
466 ///
467 /// # Arguments
468 ///
469 /// * `table_name` - The name of the table
470 ///
471 /// # Example
472 ///
473 /// ```rust
474 /// use mssql_client::change_tracking::ChangeTracking;
475 ///
476 /// let sql = ChangeTracking::min_valid_version_sql("Products");
477 /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
478 /// ```
479 #[must_use]
480 pub fn min_valid_version_sql(table_name: &str) -> String {
481 format!(
482 "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{}'))",
483 escape_nvarchar_literal(table_name)
484 )
485 }
486
487 /// Generate SQL to check if a column is in a change mask.
488 ///
489 /// Used to determine which specific columns changed in an update operation.
490 ///
491 /// # Arguments
492 ///
493 /// * `table_name` - The table name
494 /// * `column_name` - The column to check
495 /// * `mask_variable` - The name of the variable holding the change mask
496 ///
497 /// # Errors
498 ///
499 /// Returns [`Error::InvalidIdentifier`](crate::Error::InvalidIdentifier)
500 /// if `mask_variable` is not a valid SQL Server identifier — unlike the
501 /// name arguments (which are escaped into quoted literals), the variable
502 /// is spliced into the statement verbatim, so it must be validated.
503 ///
504 /// # Example
505 ///
506 /// ```rust
507 /// use mssql_client::change_tracking::ChangeTracking;
508 ///
509 /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
510 /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
511 /// ```
512 pub fn column_in_mask_sql(
513 table_name: &str,
514 column_name: &str,
515 mask_variable: &str,
516 ) -> Result<String, crate::Error> {
517 crate::validation::validate_identifier(mask_variable)?;
518 Ok(format!(
519 "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
520 COLUMNPROPERTY(OBJECT_ID(N'{}'), N'{}', 'ColumnId'), \
521 {mask_variable})",
522 escape_nvarchar_literal(table_name),
523 escape_nvarchar_literal(column_name)
524 ))
525 }
526
527 /// Generate SQL to enable change tracking on a database.
528 ///
529 /// # Arguments
530 ///
531 /// * `database_name` - The database name
532 /// * `retention_days` - How long to retain change data
533 /// * `auto_cleanup` - Whether to automatically clean up old data
534 ///
535 /// # Example
536 ///
537 /// ```rust
538 /// use mssql_client::change_tracking::ChangeTracking;
539 ///
540 /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
541 /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
542 /// ```
543 #[must_use]
544 pub fn enable_database_sql(
545 database_name: &str,
546 retention_days: u32,
547 auto_cleanup: bool,
548 ) -> String {
549 let cleanup = if auto_cleanup { "ON" } else { "OFF" };
550 format!(
551 "ALTER DATABASE [{}] SET CHANGE_TRACKING = ON \
552 (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})",
553 database_name.replace(']', "]]")
554 )
555 }
556
557 /// Generate SQL to enable change tracking on a table.
558 ///
559 /// # Arguments
560 ///
561 /// * `table_name` - The table name
562 /// * `track_columns_updated` - Whether to track which columns were updated
563 ///
564 /// # Example
565 ///
566 /// ```rust
567 /// use mssql_client::change_tracking::ChangeTracking;
568 ///
569 /// let sql = ChangeTracking::enable_table_sql("Products", true);
570 /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
571 /// ```
572 #[must_use]
573 pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
574 let track_cols = if track_columns_updated { "ON" } else { "OFF" };
575 let table = Self::bracket_table_name(table_name);
576 format!(
577 "ALTER TABLE {table} ENABLE CHANGE_TRACKING \
578 WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
579 )
580 }
581
582 /// Generate SQL to disable change tracking on a table.
583 #[must_use]
584 pub fn disable_table_sql(table_name: &str) -> String {
585 let table = Self::bracket_table_name(table_name);
586 format!("ALTER TABLE {table} DISABLE CHANGE_TRACKING")
587 }
588
589 /// Bracket a possibly schema-qualified table name part by part:
590 /// `dbo.Items` → `[dbo].[Items]`. Bracketing the whole string as one
591 /// identifier (`[dbo.Items]`) names a table literally called
592 /// "dbo.Items", which fails with error 4902 on real servers.
593 ///
594 /// Interior `]` characters are doubled (the T-SQL QUOTENAME rule), so a
595 /// hostile name like ``foo]; DROP TABLE bar--`` stays a single quoted
596 /// identifier instead of terminating it early and smuggling a second
597 /// statement into the batch. Pass names unbracketed; pre-bracketed input
598 /// is treated as literal characters of the name.
599 fn bracket_table_name(table_name: &str) -> String {
600 table_name
601 .split('.')
602 .map(|part| format!("[{}]", part.replace(']', "]]")))
603 .collect::<Vec<_>>()
604 .join(".")
605 }
606
607 /// Generate SQL to disable change tracking on a database.
608 #[must_use]
609 pub fn disable_database_sql(database_name: &str) -> String {
610 format!(
611 "ALTER DATABASE [{}] SET CHANGE_TRACKING = OFF",
612 database_name.replace(']', "]]")
613 )
614 }
615}
616
617/// Escape a string for inclusion in an `N'...'` literal: per T-SQL rules a
618/// single quote inside a string literal is escaped by doubling it, which
619/// makes any input inert — it cannot terminate the literal early.
620fn escape_nvarchar_literal(s: &str) -> String {
621 s.replace('\'', "''")
622}
623
624/// Result of checking if a sync version is still valid.
625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[non_exhaustive]
627pub enum SyncVersionStatus {
628 /// The sync version is valid and incremental sync can proceed.
629 Valid,
630 /// The sync version is too old; a full re-sync is required.
631 TooOld,
632 /// Change tracking is not enabled or the table doesn't exist.
633 NotEnabled,
634}
635
636impl SyncVersionStatus {
637 /// Check sync version validity from the min_valid_version result.
638 ///
639 /// # Arguments
640 ///
641 /// * `last_sync_version` - The client's last synchronized version
642 /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
643 ///
644 /// # Returns
645 ///
646 /// The sync status indicating whether incremental sync is possible.
647 #[must_use]
648 pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
649 match min_valid_version {
650 None => Self::NotEnabled,
651 Some(min) if last_sync_version >= min => Self::Valid,
652 Some(_) => Self::TooOld,
653 }
654 }
655
656 /// Check if incremental sync is possible.
657 #[must_use]
658 pub const fn can_sync_incrementally(&self) -> bool {
659 matches!(self, Self::Valid)
660 }
661
662 /// Check if a full re-sync is required.
663 #[must_use]
664 pub const fn requires_full_sync(&self) -> bool {
665 matches!(self, Self::TooOld)
666 }
667}
668
669#[cfg(test)]
670#[allow(clippy::unwrap_used)]
671mod tests {
672 use super::*;
673
674 #[test]
675 fn test_change_operation_from_sql() {
676 assert_eq!(
677 ChangeOperation::from_sql("I"),
678 Some(ChangeOperation::Insert)
679 );
680 assert_eq!(
681 ChangeOperation::from_sql("U"),
682 Some(ChangeOperation::Update)
683 );
684 assert_eq!(
685 ChangeOperation::from_sql("D"),
686 Some(ChangeOperation::Delete)
687 );
688 assert_eq!(
689 ChangeOperation::from_sql("i"),
690 Some(ChangeOperation::Insert)
691 );
692 assert_eq!(
693 ChangeOperation::from_sql(" U "),
694 Some(ChangeOperation::Update)
695 );
696 assert_eq!(ChangeOperation::from_sql("X"), None);
697 assert_eq!(ChangeOperation::from_sql(""), None);
698 }
699
700 #[test]
701 fn test_change_operation_as_sql() {
702 assert_eq!(ChangeOperation::Insert.as_sql(), "I");
703 assert_eq!(ChangeOperation::Update.as_sql(), "U");
704 assert_eq!(ChangeOperation::Delete.as_sql(), "D");
705 }
706
707 #[test]
708 fn test_change_operation_predicates() {
709 assert!(ChangeOperation::Insert.is_insert());
710 assert!(!ChangeOperation::Insert.is_update());
711 assert!(!ChangeOperation::Insert.is_delete());
712
713 assert!(!ChangeOperation::Update.is_insert());
714 assert!(ChangeOperation::Update.is_update());
715 assert!(!ChangeOperation::Update.is_delete());
716
717 assert!(!ChangeOperation::Delete.is_insert());
718 assert!(!ChangeOperation::Delete.is_update());
719 assert!(ChangeOperation::Delete.is_delete());
720 }
721
722 #[test]
723 fn test_change_metadata_constructors() {
724 let insert = ChangeMetadata::insert(42);
725 assert_eq!(insert.version, 42);
726 assert_eq!(insert.creation_version, Some(42));
727 assert_eq!(insert.operation, ChangeOperation::Insert);
728
729 let update = ChangeMetadata::update(50, 42);
730 assert_eq!(update.version, 50);
731 assert_eq!(update.creation_version, Some(42));
732 assert_eq!(update.operation, ChangeOperation::Update);
733
734 let delete = ChangeMetadata::delete(60);
735 assert_eq!(delete.version, 60);
736 assert_eq!(delete.creation_version, None);
737 assert_eq!(delete.operation, ChangeOperation::Delete);
738 }
739
740 #[test]
741 fn test_change_tracking_query_basic() {
742 let query = ChangeTrackingQuery::changes("Products", 42);
743 let sql = query.to_sql();
744
745 assert!(sql.contains("CHANGETABLE(CHANGES Products, 42)"));
746 assert!(sql.contains("SYS_CHANGE_VERSION"));
747 assert!(sql.contains("SYS_CHANGE_OPERATION"));
748 // SQL Server rejects CHANGETABLE without an alias (error 22104) —
749 // the substring above matches the broken pre-alias SQL too, so pin
750 // the alias explicitly.
751 assert!(
752 sql.contains(") AS CT"),
753 "CHANGETABLE must be aliased or the query is not executable: {sql}"
754 );
755 }
756
757 #[test]
758 fn test_change_tracking_query_with_columns() {
759 let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
760 let sql = query.to_sql();
761
762 assert!(sql.contains("CT.Name"));
763 assert!(sql.contains("CT.Price"));
764 }
765
766 #[test]
767 fn test_change_tracking_query_with_primary_keys() {
768 let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
769 let sql = query.to_sql();
770
771 assert!(sql.contains("CT.ProductId"));
772 }
773
774 #[test]
775 fn test_change_tracking_query_force_seek() {
776 let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
777 let sql = query.to_sql();
778
779 assert!(sql.contains("FORCESEEK"));
780 }
781
782 #[test]
783 fn test_change_tracking_query_with_data() {
784 let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
785 let sql = query.to_sql_with_data(&["Name", "Price"]);
786
787 assert!(sql.contains("LEFT OUTER JOIN Products AS T"));
788 assert!(sql.contains("CT.ProductId = T.ProductId"));
789 assert!(sql.contains("T.Name"));
790 assert!(sql.contains("T.Price"));
791 }
792
793 #[test]
794 fn test_change_tracking_helper_sql() {
795 assert_eq!(
796 ChangeTracking::current_version_sql(),
797 "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
798 );
799
800 let min_sql = ChangeTracking::min_valid_version_sql("Products");
801 assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
802 assert!(min_sql.contains("Products"));
803
804 let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
805 assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
806 assert!(mask_sql.contains("Price"));
807 assert!(mask_sql.contains("@mask"));
808 }
809
810 /// Issue #144: the helpers that embed names in `N'...'` literals must
811 /// double interior single quotes so a hostile name cannot terminate the
812 /// literal and smuggle SQL into the statement.
813 #[test]
814 fn test_nvarchar_literal_names_cannot_break_out() {
815 let hostile = "x'); SELECT 1--";
816
817 let sql = ChangeTracking::min_valid_version_sql(hostile);
818 assert!(
819 sql.contains("N'x''); SELECT 1--'"),
820 "single quotes must be doubled, got: {sql}"
821 );
822 assert!(!sql.contains("N'x');"), "literal must not end early: {sql}");
823
824 let sql = ChangeTracking::column_in_mask_sql(hostile, hostile, "@mask").unwrap();
825 assert!(sql.contains("N'x''); SELECT 1--'"));
826 assert!(!sql.contains("N'x');"));
827 }
828
829 /// Issue #144: `mask_variable` is spliced verbatim (no quoting can make
830 /// it safe), so it must be validated as an identifier.
831 #[test]
832 fn test_mask_variable_is_validated() {
833 assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask").is_ok());
834 assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask); DROP TABLE x--").is_err());
835 assert!(ChangeTracking::column_in_mask_sql("T", "C", "1 OR 1=1").is_err());
836 assert!(ChangeTracking::column_in_mask_sql("T", "C", "").is_err());
837 }
838
839 /// Issue #144: the database-level helpers bracket-quote the name and must
840 /// double interior `]` (QUOTENAME rule), matching the table helpers.
841 #[test]
842 fn test_database_name_brackets_are_escaped() {
843 let hostile = "x]; DROP DATABASE foo--";
844
845 let sql = ChangeTracking::enable_database_sql(hostile, 2, true);
846 assert!(
847 sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"),
848 "interior ] must be doubled, got: {sql}"
849 );
850 assert!(!sql.contains("ALTER DATABASE [x];"));
851
852 let sql = ChangeTracking::disable_database_sql(hostile);
853 assert!(sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"));
854 assert!(!sql.contains("ALTER DATABASE [x];"));
855 }
856
857 #[test]
858 fn test_change_tracking_enable_sql() {
859 let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
860 assert!(db_sql.contains("[MyDB]"));
861 assert!(db_sql.contains("7 DAYS"));
862 assert!(db_sql.contains("AUTO_CLEANUP = ON"));
863
864 let table_sql = ChangeTracking::enable_table_sql("Products", true);
865 assert!(table_sql.contains("[Products]"));
866 assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
867
868 // Schema-qualified names must be bracketed part by part —
869 // `[dbo.Products]` is a single (nonexistent) identifier and fails
870 // with error 4902 on a real server.
871 let qualified = ChangeTracking::enable_table_sql("dbo.Products", true);
872 assert!(qualified.contains("ALTER TABLE [dbo].[Products]"));
873 let disable = ChangeTracking::disable_table_sql("dbo.Products");
874 assert!(disable.contains("ALTER TABLE [dbo].[Products]"));
875 }
876
877 /// PR #143 review, Blocker 2: interior `]` must be doubled (QUOTENAME
878 /// rule) so a hostile table name cannot terminate the bracketed
879 /// identifier early and smuggle a second statement into the batch.
880 #[test]
881 fn test_bracket_escapes_closing_brackets() {
882 let sql = ChangeTracking::enable_table_sql("foo]; DROP TABLE bar--", true);
883 assert!(
884 sql.contains("ALTER TABLE [foo]]; DROP TABLE bar--]"),
885 "interior ] must be doubled, got: {sql}"
886 );
887 assert!(
888 !sql.contains("ALTER TABLE [foo];"),
889 "the identifier must not be terminated early: {sql}"
890 );
891
892 let sql = ChangeTracking::disable_table_sql("we]ird.na]me");
893 assert!(sql.contains("ALTER TABLE [we]]ird].[na]]me]"));
894 }
895
896 #[test]
897 fn test_sync_version_status() {
898 // Valid case
899 assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
900 assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
901
902 // Too old case
903 assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
904
905 // Not enabled case
906 let status = SyncVersionStatus::check(100, None);
907 assert_eq!(status, SyncVersionStatus::NotEnabled);
908 assert!(!status.can_sync_incrementally());
909 }
910}