mssql_client/change_tracking.rs
1//! SQL Server Change Tracking support.
2//!
3//! This module provides helper types and utilities for working with SQL Server's
4//! built-in Change Tracking feature, which enables efficient incremental data
5//! synchronization scenarios.
6//!
7//! ## Overview
8//!
9//! SQL Server Change Tracking automatically tracks changes (inserts, updates,
10//! deletes) to table rows. Applications can query for changes since a specific
11//! version to implement incremental sync patterns.
12//!
13//! ## Usage
14//!
15//! ```text
16//! use mssql_client::change_tracking::{ChangeOperation, ChangeTrackingQuery};
17//!
18//! // Get current version for baseline
19//! let current_version: i64 = client
20//! .query("SELECT CHANGE_TRACKING_CURRENT_VERSION()")
21//! .await?
22//! .first()
23//! .and_then(|r| r.try_get(0))
24//! .unwrap_or(0);
25//!
26//! // Later, query for changes since that version
27//! let query = ChangeTrackingQuery::changes("Products", last_sync_version);
28//! let changes: Vec<ChangedRow> = client.query(&query.to_sql()).await?
29//! .map(|row| ChangedRow::from_row(&row))
30//! .collect();
31//!
32//! for change in changes {
33//! match change.operation {
34//! ChangeOperation::Insert => println!("New row: {:?}", change.primary_key),
35//! ChangeOperation::Update => println!("Updated row: {:?}", change.primary_key),
36//! ChangeOperation::Delete => println!("Deleted row: {:?}", change.primary_key),
37//! }
38//! }
39//! ```
40//!
41//! ## Prerequisites
42//!
43//! Change Tracking must be enabled on the database and table:
44//!
45//! ```sql
46//! -- Enable on database
47//! ALTER DATABASE MyDB SET CHANGE_TRACKING = ON
48//! (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
49//!
50//! -- Enable on table
51//! ALTER TABLE Products ENABLE CHANGE_TRACKING
52//! WITH (TRACK_COLUMNS_UPDATED = ON);
53//! ```
54//!
55//! ## Key Concepts
56//!
57//! - **Version**: A monotonically increasing value representing a point in time
58//! - **SYS_CHANGE_OPERATION**: I (Insert), U (Update), D (Delete)
59//! - **SYS_CHANGE_VERSION**: The version when the row was last changed
60//! - **SYS_CHANGE_CREATION_VERSION**: The version when the row was inserted
61//!
62//! ## References
63//!
64//! - [About Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server)
65//! - [CHANGETABLE function](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/changetable-transact-sql)
66
67use std::fmt;
68
69use bytes::Bytes;
70
71/// The type of change operation tracked by SQL Server Change Tracking.
72///
73/// This corresponds to the `SYS_CHANGE_OPERATION` column in `CHANGETABLE` results.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
75#[non_exhaustive]
76pub enum ChangeOperation {
77 /// A new row was inserted (I).
78 Insert,
79 /// An existing row was updated (U).
80 Update,
81 /// A row was deleted (D).
82 Delete,
83}
84
85impl ChangeOperation {
86 /// Parse a change operation from its single-character SQL Server representation.
87 ///
88 /// # Arguments
89 ///
90 /// * `s` - A string containing 'I', 'U', or 'D'
91 ///
92 /// # Returns
93 ///
94 /// The parsed `ChangeOperation`, or `None` if the input is invalid.
95 #[must_use]
96 pub fn from_sql(s: &str) -> Option<Self> {
97 match s.trim().to_uppercase().as_str() {
98 "I" => Some(Self::Insert),
99 "U" => Some(Self::Update),
100 "D" => Some(Self::Delete),
101 _ => None,
102 }
103 }
104
105 /// Get the SQL Server single-character representation.
106 #[must_use]
107 pub const fn as_sql(&self) -> &'static str {
108 match self {
109 Self::Insert => "I",
110 Self::Update => "U",
111 Self::Delete => "D",
112 }
113 }
114
115 /// Check if this is an insert operation.
116 #[must_use]
117 pub const fn is_insert(&self) -> bool {
118 matches!(self, Self::Insert)
119 }
120
121 /// Check if this is an update operation.
122 #[must_use]
123 pub const fn is_update(&self) -> bool {
124 matches!(self, Self::Update)
125 }
126
127 /// Check if this is a delete operation.
128 #[must_use]
129 pub const fn is_delete(&self) -> bool {
130 matches!(self, Self::Delete)
131 }
132}
133
134impl fmt::Display for ChangeOperation {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 Self::Insert => write!(f, "INSERT"),
138 Self::Update => write!(f, "UPDATE"),
139 Self::Delete => write!(f, "DELETE"),
140 }
141 }
142}
143
144/// Metadata from a Change Tracking query result.
145///
146/// Contains the system columns returned by `CHANGETABLE(CHANGES ...)`.
147#[derive(Debug, Clone)]
148#[non_exhaustive]
149pub struct ChangeMetadata {
150 /// The version when the row was last changed.
151 pub version: i64,
152 /// The version when the row was created (inserted).
153 /// This is `None` for delete operations.
154 pub creation_version: Option<i64>,
155 /// The type of change (Insert, Update, Delete).
156 pub operation: ChangeOperation,
157 /// Binary mask of changed columns.
158 /// Use `CHANGE_TRACKING_IS_COLUMN_IN_MASK()` to interpret.
159 pub changed_columns: Option<Bytes>,
160 /// Application-defined change context.
161 pub context: Option<Bytes>,
162}
163
164impl ChangeMetadata {
165 /// Create new change metadata.
166 #[must_use]
167 pub fn new(
168 version: i64,
169 creation_version: Option<i64>,
170 operation: ChangeOperation,
171 changed_columns: Option<Bytes>,
172 context: Option<Bytes>,
173 ) -> Self {
174 Self {
175 version,
176 creation_version,
177 operation,
178 changed_columns,
179 context,
180 }
181 }
182
183 /// Create metadata for an insert operation.
184 #[must_use]
185 pub fn insert(version: i64) -> Self {
186 Self {
187 version,
188 creation_version: Some(version),
189 operation: ChangeOperation::Insert,
190 changed_columns: None,
191 context: None,
192 }
193 }
194
195 /// Create metadata for an update operation.
196 #[must_use]
197 pub fn update(version: i64, creation_version: i64) -> Self {
198 Self {
199 version,
200 creation_version: Some(creation_version),
201 operation: ChangeOperation::Update,
202 changed_columns: None,
203 context: None,
204 }
205 }
206
207 /// Create metadata for a delete operation.
208 #[must_use]
209 pub fn delete(version: i64) -> Self {
210 Self {
211 version,
212 creation_version: None,
213 operation: ChangeOperation::Delete,
214 changed_columns: None,
215 context: None,
216 }
217 }
218}
219
220/// Query builder for Change Tracking operations.
221///
222/// Helps construct proper SQL queries for common Change Tracking patterns.
223///
224/// # Example
225///
226/// ```rust
227/// use mssql_client::change_tracking::ChangeTrackingQuery;
228///
229/// // Query for all changes since version 42
230/// let query = ChangeTrackingQuery::changes("Products", 42);
231/// assert!(query.to_sql().contains("CHANGETABLE"));
232///
233/// // Query with specific columns
234/// let query = ChangeTrackingQuery::changes("Orders", 100)
235/// .with_columns(&["OrderId", "CustomerId", "OrderDate"]);
236/// let sql = query.to_sql();
237/// assert!(sql.contains("OrderId"));
238/// ```
239#[derive(Debug, Clone)]
240pub struct ChangeTrackingQuery {
241 table_name: String,
242 last_sync_version: i64,
243 columns: Option<Vec<String>>,
244 primary_keys: Option<Vec<String>>,
245 alias: String,
246 force_seek: bool,
247}
248
249impl ChangeTrackingQuery {
250 /// Create a query for changes to a table since a specific version.
251 ///
252 /// This generates a `CHANGETABLE(CHANGES table_name, last_sync_version)` query.
253 ///
254 /// # Arguments
255 ///
256 /// * `table_name` - The name of the table to query changes for
257 /// * `last_sync_version` - The version from the previous sync (0 for initial)
258 ///
259 /// # Example
260 ///
261 /// ```rust
262 /// use mssql_client::change_tracking::ChangeTrackingQuery;
263 ///
264 /// let query = ChangeTrackingQuery::changes("Products", 42);
265 /// ```
266 #[must_use]
267 pub fn changes(table_name: impl Into<String>, last_sync_version: i64) -> Self {
268 Self {
269 table_name: table_name.into(),
270 last_sync_version,
271 columns: None,
272 primary_keys: None,
273 alias: "CT".into(),
274 force_seek: false,
275 }
276 }
277
278 /// Specify which data columns to include (in addition to change tracking columns).
279 ///
280 /// If not specified, only change tracking system columns are returned.
281 ///
282 /// # Arguments
283 ///
284 /// * `columns` - Column names to include in the result
285 #[must_use]
286 pub fn with_columns(mut self, columns: &[&str]) -> Self {
287 self.columns = Some(columns.iter().map(|&s| s.to_string()).collect());
288 self
289 }
290
291 /// Specify the primary key columns for the table.
292 ///
293 /// This is needed when you want to join change tracking results
294 /// with the original table to get current row data.
295 #[must_use]
296 pub fn with_primary_keys(mut self, keys: &[&str]) -> Self {
297 self.primary_keys = Some(keys.iter().map(|&s| s.to_string()).collect());
298 self
299 }
300
301 /// Set the table alias for the CHANGETABLE result.
302 #[must_use]
303 pub fn with_alias(mut self, alias: impl Into<String>) -> Self {
304 self.alias = alias.into();
305 self
306 }
307
308 /// Enable FORCESEEK hint for the query.
309 ///
310 /// This can improve performance in some scenarios.
311 #[must_use]
312 pub fn with_force_seek(mut self) -> Self {
313 self.force_seek = true;
314 self
315 }
316
317 /// Generate the SQL query string.
318 ///
319 /// This returns a query that can be executed directly. The table name is
320 /// bracket-quoted part by part (`dbo.Items` → `[dbo].[Items]`) and the
321 /// alias, primary key, and column names are bracket-quoted as single
322 /// identifiers; pass all of them unbracketed — pre-bracketed input is
323 /// treated as literal characters of the name.
324 #[must_use]
325 pub fn to_sql(&self) -> String {
326 let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
327
328 // Build the SELECT column list
329 let select_cols = self.build_select_columns();
330
331 // SQL Server requires an alias on CHANGETABLE ("A table returned by
332 // the CHANGETABLE function must be aliased.", error 22104).
333 format!(
334 "SELECT {} FROM CHANGETABLE(CHANGES {}, {}{}) AS {}",
335 select_cols,
336 ChangeTracking::bracket_table_name(&self.table_name),
337 self.last_sync_version,
338 force_seek,
339 bracket_identifier(&self.alias)
340 )
341 }
342
343 /// Generate a SQL query that joins with the original table.
344 ///
345 /// This is useful when you need both the change tracking metadata
346 /// and the current row data (for inserts and updates).
347 ///
348 /// # Arguments
349 ///
350 /// * `data_columns` - Columns from the data table to include
351 ///
352 /// # Example
353 ///
354 /// ```rust
355 /// use mssql_client::change_tracking::ChangeTrackingQuery;
356 ///
357 /// let query = ChangeTrackingQuery::changes("Products", 42)
358 /// .with_primary_keys(&["ProductId"]);
359 /// let sql = query.to_sql_with_data(&["Name", "Price", "Stock"]);
360 /// assert!(sql.contains("LEFT OUTER JOIN"));
361 /// ```
362 #[must_use]
363 pub fn to_sql_with_data(&self, data_columns: &[&str]) -> String {
364 let force_seek = if self.force_seek { ", FORCESEEK" } else { "" };
365 let alias = bracket_identifier(&self.alias);
366
367 // Build change tracking columns
368 let ct_cols = format!(
369 "{alias}.SYS_CHANGE_VERSION, {alias}.SYS_CHANGE_CREATION_VERSION, \
370 {alias}.SYS_CHANGE_OPERATION, {alias}.SYS_CHANGE_COLUMNS, {alias}.SYS_CHANGE_CONTEXT"
371 );
372
373 // Build data columns (prefixed with table alias)
374 let data_cols: String = data_columns
375 .iter()
376 .map(|c| format!("T.{}", bracket_identifier(c)))
377 .collect::<Vec<_>>()
378 .join(", ");
379
380 // Build primary key columns from change tracking
381 let pk_cols: String = self
382 .primary_keys
383 .as_ref()
384 .map(|pks| {
385 pks.iter()
386 .map(|pk| format!("{alias}.{}", bracket_identifier(pk)))
387 .collect::<Vec<_>>()
388 .join(", ")
389 })
390 .unwrap_or_default();
391
392 // Build join condition
393 let join_condition: String = self
394 .primary_keys
395 .as_ref()
396 .map(|pks| {
397 pks.iter()
398 .map(|pk| {
399 let pk = bracket_identifier(pk);
400 format!("{alias}.{pk} = T.{pk}")
401 })
402 .collect::<Vec<_>>()
403 .join(" AND ")
404 })
405 .unwrap_or_else(|| "1=1".into());
406
407 let select_cols = if pk_cols.is_empty() {
408 format!("{ct_cols}, {data_cols}")
409 } else {
410 format!("{ct_cols}, {pk_cols}, {data_cols}")
411 };
412
413 format!(
414 "SELECT {select_cols} \
415 FROM CHANGETABLE(CHANGES {table}, {version}{force_seek}) AS {alias} \
416 LEFT OUTER JOIN {table} AS T ON {join_condition}",
417 table = ChangeTracking::bracket_table_name(&self.table_name),
418 version = self.last_sync_version,
419 )
420 }
421
422 fn build_select_columns(&self) -> String {
423 let alias = bracket_identifier(&self.alias);
424
425 // Always include change tracking system columns
426 let mut cols = vec![
427 format!("{alias}.SYS_CHANGE_VERSION"),
428 format!("{alias}.SYS_CHANGE_CREATION_VERSION"),
429 format!("{alias}.SYS_CHANGE_OPERATION"),
430 format!("{alias}.SYS_CHANGE_COLUMNS"),
431 format!("{alias}.SYS_CHANGE_CONTEXT"),
432 ];
433
434 // Add primary key columns if specified
435 if let Some(ref pks) = self.primary_keys {
436 for pk in pks {
437 cols.push(format!("{alias}.{}", bracket_identifier(pk)));
438 }
439 }
440
441 // Add data columns if specified
442 if let Some(ref data_cols) = self.columns {
443 for col in data_cols {
444 cols.push(format!("{alias}.{}", bracket_identifier(col)));
445 }
446 }
447
448 cols.join(", ")
449 }
450}
451
452/// Helper functions for Change Tracking operations.
453pub struct ChangeTracking;
454
455impl ChangeTracking {
456 /// Generate SQL to get the current change tracking version.
457 ///
458 /// Returns the global change tracking version number.
459 ///
460 /// # Example
461 ///
462 /// ```rust
463 /// use mssql_client::change_tracking::ChangeTracking;
464 ///
465 /// let sql = ChangeTracking::current_version_sql();
466 /// assert_eq!(sql, "SELECT CHANGE_TRACKING_CURRENT_VERSION()");
467 /// ```
468 #[must_use]
469 pub const fn current_version_sql() -> &'static str {
470 "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
471 }
472
473 /// Generate SQL to get the minimum valid version for a table.
474 ///
475 /// If a client's last sync version is less than this, it must
476 /// perform a full re-sync instead of incremental sync.
477 ///
478 /// # Arguments
479 ///
480 /// * `table_name` - The name of the table
481 ///
482 /// # Example
483 ///
484 /// ```rust
485 /// use mssql_client::change_tracking::ChangeTracking;
486 ///
487 /// let sql = ChangeTracking::min_valid_version_sql("Products");
488 /// assert!(sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
489 /// ```
490 #[must_use]
491 pub fn min_valid_version_sql(table_name: &str) -> String {
492 format!(
493 "SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID(N'{}'))",
494 escape_nvarchar_literal(table_name)
495 )
496 }
497
498 /// Generate SQL to check if a column is in a change mask.
499 ///
500 /// Used to determine which specific columns changed in an update operation.
501 ///
502 /// # Arguments
503 ///
504 /// * `table_name` - The table name
505 /// * `column_name` - The column to check
506 /// * `mask_variable` - The name of the variable holding the change mask
507 ///
508 /// # Errors
509 ///
510 /// Returns [`Error::InvalidIdentifier`](crate::Error::InvalidIdentifier)
511 /// if `mask_variable` is not a valid SQL Server identifier — unlike the
512 /// name arguments (which are escaped into quoted literals), the variable
513 /// is spliced into the statement verbatim, so it must be validated.
514 ///
515 /// # Example
516 ///
517 /// ```rust
518 /// use mssql_client::change_tracking::ChangeTracking;
519 ///
520 /// let sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
521 /// assert!(sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
522 /// ```
523 pub fn column_in_mask_sql(
524 table_name: &str,
525 column_name: &str,
526 mask_variable: &str,
527 ) -> Result<String, crate::Error> {
528 crate::validation::validate_identifier(mask_variable)?;
529 Ok(format!(
530 "SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK(\
531 COLUMNPROPERTY(OBJECT_ID(N'{}'), N'{}', 'ColumnId'), \
532 {mask_variable})",
533 escape_nvarchar_literal(table_name),
534 escape_nvarchar_literal(column_name)
535 ))
536 }
537
538 /// Generate SQL to enable change tracking on a database.
539 ///
540 /// # Arguments
541 ///
542 /// * `database_name` - The database name
543 /// * `retention_days` - How long to retain change data
544 /// * `auto_cleanup` - Whether to automatically clean up old data
545 ///
546 /// # Example
547 ///
548 /// ```rust
549 /// use mssql_client::change_tracking::ChangeTracking;
550 ///
551 /// let sql = ChangeTracking::enable_database_sql("MyDB", 2, true);
552 /// assert!(sql.contains("SET CHANGE_TRACKING = ON"));
553 /// ```
554 #[must_use]
555 pub fn enable_database_sql(
556 database_name: &str,
557 retention_days: u32,
558 auto_cleanup: bool,
559 ) -> String {
560 let cleanup = if auto_cleanup { "ON" } else { "OFF" };
561 format!(
562 "ALTER DATABASE [{}] SET CHANGE_TRACKING = ON \
563 (CHANGE_RETENTION = {retention_days} DAYS, AUTO_CLEANUP = {cleanup})",
564 database_name.replace(']', "]]")
565 )
566 }
567
568 /// Generate SQL to enable change tracking on a table.
569 ///
570 /// # Arguments
571 ///
572 /// * `table_name` - The table name
573 /// * `track_columns_updated` - Whether to track which columns were updated
574 ///
575 /// # Example
576 ///
577 /// ```rust
578 /// use mssql_client::change_tracking::ChangeTracking;
579 ///
580 /// let sql = ChangeTracking::enable_table_sql("Products", true);
581 /// assert!(sql.contains("ENABLE CHANGE_TRACKING"));
582 /// ```
583 #[must_use]
584 pub fn enable_table_sql(table_name: &str, track_columns_updated: bool) -> String {
585 let track_cols = if track_columns_updated { "ON" } else { "OFF" };
586 let table = Self::bracket_table_name(table_name);
587 format!(
588 "ALTER TABLE {table} ENABLE CHANGE_TRACKING \
589 WITH (TRACK_COLUMNS_UPDATED = {track_cols})"
590 )
591 }
592
593 /// Generate SQL to disable change tracking on a table.
594 #[must_use]
595 pub fn disable_table_sql(table_name: &str) -> String {
596 let table = Self::bracket_table_name(table_name);
597 format!("ALTER TABLE {table} DISABLE CHANGE_TRACKING")
598 }
599
600 /// Bracket a possibly schema-qualified table name part by part:
601 /// `dbo.Items` → `[dbo].[Items]`. Bracketing the whole string as one
602 /// identifier (`[dbo.Items]`) names a table literally called
603 /// "dbo.Items", which fails with error 4902 on real servers.
604 ///
605 /// Interior `]` characters are doubled (the T-SQL QUOTENAME rule), so a
606 /// hostile name like ``foo]; DROP TABLE bar--`` stays a single quoted
607 /// identifier instead of terminating it early and smuggling a second
608 /// statement into the batch. Pass names unbracketed; pre-bracketed input
609 /// is treated as literal characters of the name.
610 fn bracket_table_name(table_name: &str) -> String {
611 table_name
612 .split('.')
613 .map(|part| format!("[{}]", part.replace(']', "]]")))
614 .collect::<Vec<_>>()
615 .join(".")
616 }
617
618 /// Generate SQL to disable change tracking on a database.
619 #[must_use]
620 pub fn disable_database_sql(database_name: &str) -> String {
621 format!(
622 "ALTER DATABASE [{}] SET CHANGE_TRACKING = OFF",
623 database_name.replace(']', "]]")
624 )
625 }
626}
627
628/// Escape a string for inclusion in an `N'...'` literal: per T-SQL rules a
629/// single quote inside a string literal is escaped by doubling it, which
630/// makes any input inert — it cannot terminate the literal early.
631fn escape_nvarchar_literal(s: &str) -> String {
632 s.replace('\'', "''")
633}
634
635/// Bracket a single identifier (alias, column, or primary-key name) with
636/// interior `]` doubled (the T-SQL QUOTENAME rule), so hostile input stays
637/// one quoted identifier instead of escaping into the surrounding statement.
638/// Pass names unbracketed; pre-bracketed input is treated as literal
639/// characters of the name.
640fn bracket_identifier(name: &str) -> String {
641 format!("[{}]", name.replace(']', "]]"))
642}
643
644/// Result of checking if a sync version is still valid.
645#[derive(Debug, Clone, Copy, PartialEq, Eq)]
646#[non_exhaustive]
647pub enum SyncVersionStatus {
648 /// The sync version is valid and incremental sync can proceed.
649 Valid,
650 /// The sync version is too old; a full re-sync is required.
651 TooOld,
652 /// Change tracking is not enabled or the table doesn't exist.
653 NotEnabled,
654}
655
656impl SyncVersionStatus {
657 /// Check sync version validity from the min_valid_version result.
658 ///
659 /// # Arguments
660 ///
661 /// * `last_sync_version` - The client's last synchronized version
662 /// * `min_valid_version` - Result from `CHANGE_TRACKING_MIN_VALID_VERSION()`
663 ///
664 /// # Returns
665 ///
666 /// The sync status indicating whether incremental sync is possible.
667 #[must_use]
668 pub fn check(last_sync_version: i64, min_valid_version: Option<i64>) -> Self {
669 match min_valid_version {
670 None => Self::NotEnabled,
671 Some(min) if last_sync_version >= min => Self::Valid,
672 Some(_) => Self::TooOld,
673 }
674 }
675
676 /// Check if incremental sync is possible.
677 #[must_use]
678 pub const fn can_sync_incrementally(&self) -> bool {
679 matches!(self, Self::Valid)
680 }
681
682 /// Check if a full re-sync is required.
683 #[must_use]
684 pub const fn requires_full_sync(&self) -> bool {
685 matches!(self, Self::TooOld)
686 }
687}
688
689#[cfg(test)]
690#[allow(clippy::unwrap_used)]
691mod tests {
692 use super::*;
693
694 #[test]
695 fn test_change_operation_from_sql() {
696 assert_eq!(
697 ChangeOperation::from_sql("I"),
698 Some(ChangeOperation::Insert)
699 );
700 assert_eq!(
701 ChangeOperation::from_sql("U"),
702 Some(ChangeOperation::Update)
703 );
704 assert_eq!(
705 ChangeOperation::from_sql("D"),
706 Some(ChangeOperation::Delete)
707 );
708 assert_eq!(
709 ChangeOperation::from_sql("i"),
710 Some(ChangeOperation::Insert)
711 );
712 assert_eq!(
713 ChangeOperation::from_sql(" U "),
714 Some(ChangeOperation::Update)
715 );
716 assert_eq!(ChangeOperation::from_sql("X"), None);
717 assert_eq!(ChangeOperation::from_sql(""), None);
718 }
719
720 #[test]
721 fn test_change_operation_as_sql() {
722 assert_eq!(ChangeOperation::Insert.as_sql(), "I");
723 assert_eq!(ChangeOperation::Update.as_sql(), "U");
724 assert_eq!(ChangeOperation::Delete.as_sql(), "D");
725 }
726
727 #[test]
728 fn test_change_operation_predicates() {
729 assert!(ChangeOperation::Insert.is_insert());
730 assert!(!ChangeOperation::Insert.is_update());
731 assert!(!ChangeOperation::Insert.is_delete());
732
733 assert!(!ChangeOperation::Update.is_insert());
734 assert!(ChangeOperation::Update.is_update());
735 assert!(!ChangeOperation::Update.is_delete());
736
737 assert!(!ChangeOperation::Delete.is_insert());
738 assert!(!ChangeOperation::Delete.is_update());
739 assert!(ChangeOperation::Delete.is_delete());
740 }
741
742 #[test]
743 fn test_change_metadata_constructors() {
744 let insert = ChangeMetadata::insert(42);
745 assert_eq!(insert.version, 42);
746 assert_eq!(insert.creation_version, Some(42));
747 assert_eq!(insert.operation, ChangeOperation::Insert);
748
749 let update = ChangeMetadata::update(50, 42);
750 assert_eq!(update.version, 50);
751 assert_eq!(update.creation_version, Some(42));
752 assert_eq!(update.operation, ChangeOperation::Update);
753
754 let delete = ChangeMetadata::delete(60);
755 assert_eq!(delete.version, 60);
756 assert_eq!(delete.creation_version, None);
757 assert_eq!(delete.operation, ChangeOperation::Delete);
758 }
759
760 #[test]
761 fn test_change_tracking_query_basic() {
762 let query = ChangeTrackingQuery::changes("Products", 42);
763 let sql = query.to_sql();
764
765 assert!(sql.contains("CHANGETABLE(CHANGES [Products], 42)"));
766 assert!(sql.contains("SYS_CHANGE_VERSION"));
767 assert!(sql.contains("SYS_CHANGE_OPERATION"));
768 // SQL Server rejects CHANGETABLE without an alias (error 22104) —
769 // the substring above matches the broken pre-alias SQL too, so pin
770 // the alias explicitly.
771 assert!(
772 sql.contains(") AS [CT]"),
773 "CHANGETABLE must be aliased or the query is not executable: {sql}"
774 );
775 }
776
777 #[test]
778 fn test_change_tracking_query_with_columns() {
779 let query = ChangeTrackingQuery::changes("Products", 42).with_columns(&["Name", "Price"]);
780 let sql = query.to_sql();
781
782 assert!(sql.contains("[CT].[Name]"));
783 assert!(sql.contains("[CT].[Price]"));
784 }
785
786 #[test]
787 fn test_change_tracking_query_with_primary_keys() {
788 let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
789 let sql = query.to_sql();
790
791 assert!(sql.contains("[CT].[ProductId]"));
792 }
793
794 #[test]
795 fn test_change_tracking_query_force_seek() {
796 let query = ChangeTrackingQuery::changes("Products", 42).with_force_seek();
797 let sql = query.to_sql();
798
799 assert!(sql.contains("FORCESEEK"));
800 }
801
802 #[test]
803 fn test_change_tracking_query_with_data() {
804 let query = ChangeTrackingQuery::changes("Products", 42).with_primary_keys(&["ProductId"]);
805 let sql = query.to_sql_with_data(&["Name", "Price"]);
806
807 assert!(sql.contains("LEFT OUTER JOIN [Products] AS T"));
808 assert!(sql.contains("[CT].[ProductId] = T.[ProductId]"));
809 assert!(sql.contains("T.[Name]"));
810 assert!(sql.contains("T.[Price]"));
811 }
812
813 /// Issue #186: the query builder spliced identifiers verbatim, so a
814 /// hostile table name could terminate CHANGETABLE(...) and smuggle a
815 /// second statement into the batch. Same rules as the #144 helpers:
816 /// per-part bracketing for the table name, single-identifier bracketing
817 /// with `]`-doubling for alias/column/PK names.
818 #[test]
819 fn test_change_tracking_query_brackets_hostile_identifiers() {
820 let hostile_table = "T, 0) AS CT; DROP TABLE x--";
821 let sql = ChangeTrackingQuery::changes(hostile_table, 42).to_sql();
822 assert!(
823 sql.contains("CHANGETABLE(CHANGES [T, 0) AS CT; DROP TABLE x--], 42)"),
824 "hostile table name must stay one quoted identifier: {sql}"
825 );
826
827 // `]` is doubled so the identifier cannot be closed early.
828 let sql = ChangeTrackingQuery::changes("foo]; DROP TABLE bar--", 1).to_sql();
829 assert!(sql.contains("CHANGETABLE(CHANGES [foo]]; DROP TABLE bar--], 1)"));
830
831 // Schema-qualified names bracket part by part.
832 let sql = ChangeTrackingQuery::changes("dbo.Items", 1).to_sql();
833 assert!(sql.contains("CHANGETABLE(CHANGES [dbo].[Items], 1)"));
834
835 // Alias and column names are bracketed in the SELECT list.
836 let sql = ChangeTrackingQuery::changes("Products", 1)
837 .with_alias("A]; DROP TABLE x--")
838 .with_columns(&["C] FROM x--"])
839 .to_sql();
840 assert!(sql.contains("AS [A]]; DROP TABLE x--]"));
841 assert!(sql.contains("[A]]; DROP TABLE x--].[C]] FROM x--]"));
842
843 // PK and data-column names are bracketed in the join form too.
844 let sql = ChangeTrackingQuery::changes("Products", 1)
845 .with_primary_keys(&["P]--"])
846 .to_sql_with_data(&["D]--"]);
847 assert!(sql.contains("[CT].[P]]--] = T.[P]]--]"));
848 assert!(sql.contains("T.[D]]--]"));
849 }
850
851 #[test]
852 fn test_change_tracking_helper_sql() {
853 assert_eq!(
854 ChangeTracking::current_version_sql(),
855 "SELECT CHANGE_TRACKING_CURRENT_VERSION()"
856 );
857
858 let min_sql = ChangeTracking::min_valid_version_sql("Products");
859 assert!(min_sql.contains("CHANGE_TRACKING_MIN_VALID_VERSION"));
860 assert!(min_sql.contains("Products"));
861
862 let mask_sql = ChangeTracking::column_in_mask_sql("Products", "Price", "@mask").unwrap();
863 assert!(mask_sql.contains("CHANGE_TRACKING_IS_COLUMN_IN_MASK"));
864 assert!(mask_sql.contains("Price"));
865 assert!(mask_sql.contains("@mask"));
866 }
867
868 /// Issue #144: the helpers that embed names in `N'...'` literals must
869 /// double interior single quotes so a hostile name cannot terminate the
870 /// literal and smuggle SQL into the statement.
871 #[test]
872 fn test_nvarchar_literal_names_cannot_break_out() {
873 let hostile = "x'); SELECT 1--";
874
875 let sql = ChangeTracking::min_valid_version_sql(hostile);
876 assert!(
877 sql.contains("N'x''); SELECT 1--'"),
878 "single quotes must be doubled, got: {sql}"
879 );
880 assert!(!sql.contains("N'x');"), "literal must not end early: {sql}");
881
882 let sql = ChangeTracking::column_in_mask_sql(hostile, hostile, "@mask").unwrap();
883 assert!(sql.contains("N'x''); SELECT 1--'"));
884 assert!(!sql.contains("N'x');"));
885 }
886
887 /// Issue #144: `mask_variable` is spliced verbatim (no quoting can make
888 /// it safe), so it must be validated as an identifier.
889 #[test]
890 fn test_mask_variable_is_validated() {
891 assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask").is_ok());
892 assert!(ChangeTracking::column_in_mask_sql("T", "C", "@mask); DROP TABLE x--").is_err());
893 assert!(ChangeTracking::column_in_mask_sql("T", "C", "1 OR 1=1").is_err());
894 assert!(ChangeTracking::column_in_mask_sql("T", "C", "").is_err());
895 }
896
897 /// Issue #144: the database-level helpers bracket-quote the name and must
898 /// double interior `]` (QUOTENAME rule), matching the table helpers.
899 #[test]
900 fn test_database_name_brackets_are_escaped() {
901 let hostile = "x]; DROP DATABASE foo--";
902
903 let sql = ChangeTracking::enable_database_sql(hostile, 2, true);
904 assert!(
905 sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"),
906 "interior ] must be doubled, got: {sql}"
907 );
908 assert!(!sql.contains("ALTER DATABASE [x];"));
909
910 let sql = ChangeTracking::disable_database_sql(hostile);
911 assert!(sql.contains("ALTER DATABASE [x]]; DROP DATABASE foo--]"));
912 assert!(!sql.contains("ALTER DATABASE [x];"));
913 }
914
915 #[test]
916 fn test_change_tracking_enable_sql() {
917 let db_sql = ChangeTracking::enable_database_sql("MyDB", 7, true);
918 assert!(db_sql.contains("[MyDB]"));
919 assert!(db_sql.contains("7 DAYS"));
920 assert!(db_sql.contains("AUTO_CLEANUP = ON"));
921
922 let table_sql = ChangeTracking::enable_table_sql("Products", true);
923 assert!(table_sql.contains("[Products]"));
924 assert!(table_sql.contains("TRACK_COLUMNS_UPDATED = ON"));
925
926 // Schema-qualified names must be bracketed part by part —
927 // `[dbo.Products]` is a single (nonexistent) identifier and fails
928 // with error 4902 on a real server.
929 let qualified = ChangeTracking::enable_table_sql("dbo.Products", true);
930 assert!(qualified.contains("ALTER TABLE [dbo].[Products]"));
931 let disable = ChangeTracking::disable_table_sql("dbo.Products");
932 assert!(disable.contains("ALTER TABLE [dbo].[Products]"));
933 }
934
935 /// PR #143 review, Blocker 2: interior `]` must be doubled (QUOTENAME
936 /// rule) so a hostile table name cannot terminate the bracketed
937 /// identifier early and smuggle a second statement into the batch.
938 #[test]
939 fn test_bracket_escapes_closing_brackets() {
940 let sql = ChangeTracking::enable_table_sql("foo]; DROP TABLE bar--", true);
941 assert!(
942 sql.contains("ALTER TABLE [foo]]; DROP TABLE bar--]"),
943 "interior ] must be doubled, got: {sql}"
944 );
945 assert!(
946 !sql.contains("ALTER TABLE [foo];"),
947 "the identifier must not be terminated early: {sql}"
948 );
949
950 let sql = ChangeTracking::disable_table_sql("we]ird.na]me");
951 assert!(sql.contains("ALTER TABLE [we]]ird].[na]]me]"));
952 }
953
954 #[test]
955 fn test_sync_version_status() {
956 // Valid case
957 assert!(SyncVersionStatus::check(100, Some(50)).can_sync_incrementally());
958 assert!(SyncVersionStatus::check(50, Some(50)).can_sync_incrementally());
959
960 // Too old case
961 assert!(SyncVersionStatus::check(40, Some(50)).requires_full_sync());
962
963 // Not enabled case
964 let status = SyncVersionStatus::check(100, None);
965 assert_eq!(status, SyncVersionStatus::NotEnabled);
966 assert!(!status.can_sync_incrementally());
967 }
968}