Skip to main content

sqlx_mssql_odbc_core/
migrate.rs

1//! Migration support for MSSQL via ODBC.
2//!
3//! Implements [`MigrateDatabase`] for [`Mssql`] (database lifecycle) and
4//! [`Migrate`] for [`MssqlConnection`] (migration execution and tracking)
5//! so that [`Migrator`](sqlx_core::migrate::Migrator) works with this driver.
6
7use crate::{Mssql, MssqlConnection, MssqlConnectOptions};
8use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::migrate::{AppliedMigration, Migrate, MigrateDatabase, MigrateError, Migration};
11use std::str::FromStr;
12use std::time::Duration;
13use url::Url;
14
15// ---------------------------------------------------------------------------
16// Helpers
17// ---------------------------------------------------------------------------
18
19/// Extracts the database name from a `mssql://` URL.
20fn extract_database_name(url: &str) -> std::result::Result<String, Error> {
21    let parsed = Url::parse(url).map_err(|e| {
22        Error::Protocol(format!("failed to parse migration URL: {e}"))
23    })?;
24    let database = parsed.path().trim_start_matches('/').to_owned();
25    if database.is_empty() {
26        return Err(Error::Configuration(
27            "migration URL does not contain a database name".into(),
28        ));
29    }
30    Ok(database)
31}
32
33/// Escapes a value for use inside square brackets in T-SQL.
34fn escape_sql_bracket(value: &str) -> String {
35    value.replace(']', "]]")
36}
37
38/// Escapes a string for use inside a `N'...'` T-SQL string literal.
39fn escape_sql_string(value: &str) -> String {
40    value.replace('\'', "''")
41}
42
43/// Formats a byte slice as a T-SQL hex literal (e.g. `0xDEADBEEF`).
44fn format_hex(bytes: &[u8]) -> String {
45    let mut hex = String::with_capacity(2 + bytes.len() * 2);
46    hex.push_str("0x");
47    for byte in bytes {
48        hex.push_str(&format!("{byte:02X}"));
49    }
50    hex
51}
52
53/// Splits a potentially schema-qualified table name into (schema, table).
54/// If no schema is present, defaults to the empty string (caller uses the
55/// name as-is).
56fn split_table_name(table_name: &str) -> (&str, &str) {
57    if let Some(dot) = table_name.find('.') {
58        let schema = &table_name[..dot];
59        let table = &table_name[dot + 1..];
60        (schema, table)
61    } else {
62        ("", table_name)
63    }
64}
65
66/// Builds a safe `[schema].[table]` reference.
67fn quoted_table_name(table_name: &str) -> String {
68    let (schema, table) = split_table_name(table_name);
69    if schema.is_empty() {
70        format!("[{}]", escape_sql_bracket(table))
71    } else {
72        format!(
73            "[{}].[{}]",
74            escape_sql_bracket(schema),
75            escape_sql_bracket(table),
76        )
77    }
78}
79
80// ---------------------------------------------------------------------------
81// MigrateDatabase — database lifecycle (create / drop / exists)
82// ---------------------------------------------------------------------------
83
84impl MigrateDatabase for Mssql {
85    fn create_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
86        async move {
87            let options = MssqlConnectOptions::from_str(url)?;
88            let database = extract_database_name(url)?;
89            let master_options = options.with_database("master");
90            let conn = MssqlConnection::connect_blocking(&master_options)?;
91            conn.exec_sql_blocking(&format!(
92                "CREATE DATABASE [{}]",
93                escape_sql_bracket(&database),
94            ))?;
95            drop(conn);
96            Ok(())
97        }
98    }
99
100    fn database_exists(url: &str) -> impl std::future::Future<Output = Result<bool, Error>> + Send + '_ {
101        async move {
102            let options = MssqlConnectOptions::from_str(url)?;
103
104            // Fast path: try connecting directly to the target database.
105            if MssqlConnection::connect_blocking(&options).is_ok() {
106                return Ok(true);
107            }
108
109            // Fallback: connect to master and check sys.databases.
110            let database = extract_database_name(url)?;
111            let master_options = options.with_database("master");
112            let conn = match MssqlConnection::connect_blocking(&master_options) {
113                Ok(conn) => conn,
114                Err(_) => return Ok(false),
115            };
116
117            let sql = format!(
118                "SELECT COUNT(*) FROM sys.databases WHERE name = N'{}'",
119                escape_sql_string(&database),
120            );
121            let count = conn
122                .scalar_i64_blocking(&sql)?
123                .unwrap_or(0);
124
125            drop(conn);
126            Ok(count > 0)
127        }
128    }
129
130    fn drop_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
131        async move {
132            let options = MssqlConnectOptions::from_str(url)?;
133            let database = extract_database_name(url)?;
134            let master_options = options.with_database("master");
135            let conn = MssqlConnection::connect_blocking(&master_options)?;
136            conn.exec_sql_blocking(&format!(
137                "DROP DATABASE IF EXISTS [{}]",
138                escape_sql_bracket(&database),
139            ))?;
140            drop(conn);
141            Ok(())
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Migrate — migration execution and tracking on MssqlConnection
148// ---------------------------------------------------------------------------
149
150impl Migrate for MssqlConnection {
151    /// MSSQL does not support `CREATE SCHEMA IF NOT EXISTS` as a single
152    /// statement, so we use a conditional T-SQL block.
153    fn create_schema_if_not_exists<'e>(
154        &'e mut self,
155        schema_name: &'e str,
156    ) -> BoxFuture<'e, Result<(), MigrateError>> {
157        let sql = format!(
158            "IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = N'{}') \
159             EXEC('CREATE SCHEMA [{}]')",
160            escape_sql_string(schema_name),
161            escape_sql_bracket(schema_name),
162        );
163        Box::pin(async move {
164            self.exec_sql_blocking(&sql).map_err(MigrateError::Execute)?;
165            Ok(())
166        })
167    }
168
169    /// Creates the migrations tracking table if it does not yet exist.
170    fn ensure_migrations_table<'e>(
171        &'e mut self,
172        table_name: &'e str,
173    ) -> BoxFuture<'e, Result<(), MigrateError>> {
174        let quoted = quoted_table_name(table_name);
175
176        // Determine the schema part for INFORMATION_SCHEMA lookup.
177        let (schema, table) = split_table_name(table_name);
178        let schema_condition = if schema.is_empty() {
179            "TABLE_SCHEMA = 'dbo'".to_owned()
180        } else {
181            format!("TABLE_SCHEMA = N'{}'", escape_sql_string(schema))
182        };
183
184        let create_sql = format!(
185            "IF NOT EXISTS ( \
186             SELECT * FROM INFORMATION_SCHEMA.TABLES \
187             WHERE TABLE_NAME = N'{table}' AND {schema_condition} \
188             ) \
189             CREATE TABLE {quoted} ( \
190             version    BIGINT         NOT NULL PRIMARY KEY, \
191             description NVARCHAR(MAX) NOT NULL, \
192             migration_type NVARCHAR(20)  NOT NULL, \
193             sql        NVARCHAR(MAX) NOT NULL, \
194             checksum   VARBINARY(8000)  NOT NULL, \
195             executed_at DATETIME2     NOT NULL DEFAULT GETUTCDATE(), \
196             no_tx      BIT            NOT NULL DEFAULT 0 \
197             )",
198            table = escape_sql_string(table),
199            schema_condition = schema_condition,
200            quoted = quoted,
201        );
202
203        Box::pin(async move {
204            self.exec_sql_blocking(&create_sql).map_err(MigrateError::Execute)?;
205            Ok(())
206        })
207    }
208
209    /// MSSQL supports transactional DDL, so a dirty (partially applied)
210    /// migration cannot occur. Always returns `None`.
211    fn dirty_version<'e>(
212        &'e mut self,
213        _table_name: &'e str,
214    ) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
215        Box::pin(async move { Ok(None) })
216    }
217
218    /// Lists all previously applied migrations, ordered by version.
219    fn list_applied_migrations<'e>(
220        &'e mut self,
221        table_name: &'e str,
222    ) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
223        let quoted = quoted_table_name(table_name);
224        let sql = format!(
225            "SELECT version, checksum FROM {quoted} ORDER BY version",
226        );
227
228        Box::pin(async move {
229            let rows = self.list_migrations_blocking(&sql)?;
230            let migrations = rows
231                .into_iter()
232                .map(|(version, checksum)| AppliedMigration {
233                    version,
234                    checksum: checksum.into(),
235                })
236                .collect();
237            Ok(migrations)
238        })
239    }
240
241    /// Acquires an exclusive application-level lock using `sp_getapplock`.
242    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
243        Box::pin(async move {
244            self.exec_sql_blocking(
245                "EXEC sp_getapplock \
246                 @Resource = N'sqlx_migration_lock', \
247                 @LockMode = 'Exclusive', \
248                 @LockOwner = 'Session'",
249            )
250            .map_err(MigrateError::Execute)?;
251            Ok(())
252        })
253    }
254
255    /// Releases the application-level lock using `sp_releaseapplock`.
256    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
257        Box::pin(async move {
258            self.exec_sql_blocking(
259                "EXEC sp_releaseapplock \
260                 @Resource = N'sqlx_migration_lock', \
261                 @LockOwner = 'Session'",
262            )
263            .map_err(MigrateError::Execute)?;
264            Ok(())
265        })
266    }
267
268    /// Applies a migration: executes the SQL, then records the migration in
269    /// the tracking table.
270    fn apply<'e>(
271        &'e mut self,
272        _table_name: &'e str,
273        migration: &'e Migration,
274    ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
275        let quoted = quoted_table_name(_table_name);
276        let sql = migration.sql.as_str().to_owned();
277        let version = migration.version;
278        let description = migration.description.to_string();
279        let migration_type = format!("{:?}", migration.migration_type);
280        let checksum = migration.checksum.to_vec();
281        let no_tx = migration.no_tx;
282
283        let insert_sql = format!(
284            "INSERT INTO {quoted} \
285             (version, description, migration_type, sql, checksum, no_tx) \
286             VALUES ({version}, N'{desc}', N'{mt}', N'{sql_text}', {chk}, {ntx})",
287            quoted = quoted,
288            version = version,
289            desc = escape_sql_string(&description),
290            mt = escape_sql_string(&migration_type),
291            sql_text = escape_sql_string(migration.sql.as_str()),
292            chk = format_hex(&checksum),
293            ntx = if no_tx { 1 } else { 0 },
294        );
295
296        Box::pin(async move {
297            self.apply_migration_blocking(&sql, &insert_sql, version, no_tx)
298                .map_err(|e| MigrateError::ExecuteMigration(e, version))
299        })
300    }
301
302    /// Reverts a migration: executes the down SQL, then removes the tracking
303    /// record.
304    fn revert<'e>(
305        &'e mut self,
306        _table_name: &'e str,
307        migration: &'e Migration,
308    ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
309        let quoted = quoted_table_name(_table_name);
310        let sql = migration.sql.as_str().to_owned();
311        let version = migration.version;
312        let no_tx = migration.no_tx;
313
314        let delete_sql = format!(
315            "DELETE FROM {quoted} WHERE version = {version}",
316            quoted = quoted,
317            version = version,
318        );
319
320        Box::pin(async move {
321            self.revert_migration_blocking(&sql, &delete_sql, version, no_tx)
322                .map_err(|e| MigrateError::ExecuteMigration(e, version))
323        })
324    }
325
326    /// Marks a migration as applied without executing its SQL.
327    fn skip<'e>(
328        &'e mut self,
329        _table_name: &'e str,
330        _migration: &'e Migration,
331    ) -> BoxFuture<'e, Result<(), MigrateError>> {
332        let quoted = quoted_table_name(_table_name);
333        let version = _migration.version;
334        let description = _migration.description.to_string();
335        let migration_type = format!("{:?}", _migration.migration_type);
336        let checksum = _migration.checksum.to_vec();
337        let no_tx = _migration.no_tx;
338
339        Box::pin(async move {
340            let insert_sql = format!(
341                "INSERT INTO {quoted} \
342                 (version, description, migration_type, sql, checksum, no_tx) \
343                 VALUES ({version}, N'{desc}', N'{mt}', N'', {chk}, {ntx})",
344                quoted = quoted,
345                version = version,
346                desc = escape_sql_string(&description),
347                mt = escape_sql_string(&migration_type),
348                chk = format_hex(&checksum),
349                ntx = if no_tx { 1 } else { 0 },
350            );
351            self.exec_sql_blocking(&insert_sql)
352                .map_err(|e| MigrateError::ExecuteMigration(e, version))
353        })
354    }
355}