1use crate::{Mssql, MssqlConnection, MssqlConnectOptions};
8use futures_core::future::BoxFuture;
9use sqlx_core::error::Error;
10use sqlx_core::migrate::{AppliedMigration, Migrate, MigrateDatabase, MigrateError, Migration};
11use std::str::FromStr;
12use std::time::Duration;
13use url::Url;
14
15fn extract_database_name(url: &str) -> std::result::Result<String, Error> {
21 let parsed = Url::parse(url).map_err(|e| {
22 Error::Protocol(format!("failed to parse migration URL: {e}"))
23 })?;
24 let database = parsed.path().trim_start_matches('/').to_owned();
25 if database.is_empty() {
26 return Err(Error::Configuration(
27 "migration URL does not contain a database name".into(),
28 ));
29 }
30 Ok(database)
31}
32
33fn escape_sql_bracket(value: &str) -> String {
35 value.replace(']', "]]")
36}
37
38fn escape_sql_string(value: &str) -> String {
40 value.replace('\'', "''")
41}
42
43fn format_hex(bytes: &[u8]) -> String {
45 let mut hex = String::with_capacity(2 + bytes.len() * 2);
46 hex.push_str("0x");
47 for byte in bytes {
48 hex.push_str(&format!("{byte:02X}"));
49 }
50 hex
51}
52
53fn split_table_name(table_name: &str) -> (&str, &str) {
57 if let Some(dot) = table_name.find('.') {
58 let schema = &table_name[..dot];
59 let table = &table_name[dot + 1..];
60 (schema, table)
61 } else {
62 ("", table_name)
63 }
64}
65
66fn quoted_table_name(table_name: &str) -> String {
68 let (schema, table) = split_table_name(table_name);
69 if schema.is_empty() {
70 format!("[{}]", escape_sql_bracket(table))
71 } else {
72 format!(
73 "[{}].[{}]",
74 escape_sql_bracket(schema),
75 escape_sql_bracket(table),
76 )
77 }
78}
79
80impl MigrateDatabase for Mssql {
85 fn create_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
86 async move {
87 let options = MssqlConnectOptions::from_str(url)?;
88 let database = extract_database_name(url)?;
89 let master_options = options.with_database("master");
90 let conn = MssqlConnection::connect_blocking(&master_options)?;
91 conn.exec_sql_blocking(&format!(
92 "CREATE DATABASE [{}]",
93 escape_sql_bracket(&database),
94 ))?;
95 drop(conn);
96 Ok(())
97 }
98 }
99
100 fn database_exists(url: &str) -> impl std::future::Future<Output = Result<bool, Error>> + Send + '_ {
101 async move {
102 let options = MssqlConnectOptions::from_str(url)?;
103
104 if MssqlConnection::connect_blocking(&options).is_ok() {
106 return Ok(true);
107 }
108
109 let database = extract_database_name(url)?;
111 let master_options = options.with_database("master");
112 let conn = match MssqlConnection::connect_blocking(&master_options) {
113 Ok(conn) => conn,
114 Err(_) => return Ok(false),
115 };
116
117 let sql = format!(
118 "SELECT COUNT(*) FROM sys.databases WHERE name = N'{}'",
119 escape_sql_string(&database),
120 );
121 let count = conn
122 .scalar_i64_blocking(&sql)?
123 .unwrap_or(0);
124
125 drop(conn);
126 Ok(count > 0)
127 }
128 }
129
130 fn drop_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
131 async move {
132 let options = MssqlConnectOptions::from_str(url)?;
133 let database = extract_database_name(url)?;
134 let master_options = options.with_database("master");
135 let conn = MssqlConnection::connect_blocking(&master_options)?;
136 conn.exec_sql_blocking(&format!(
137 "DROP DATABASE IF EXISTS [{}]",
138 escape_sql_bracket(&database),
139 ))?;
140 drop(conn);
141 Ok(())
142 }
143 }
144}
145
146impl Migrate for MssqlConnection {
151 fn create_schema_if_not_exists<'e>(
154 &'e mut self,
155 schema_name: &'e str,
156 ) -> BoxFuture<'e, Result<(), MigrateError>> {
157 let sql = format!(
158 "IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = N'{}') \
159 EXEC('CREATE SCHEMA [{}]')",
160 escape_sql_string(schema_name),
161 escape_sql_bracket(schema_name),
162 );
163 Box::pin(async move {
164 self.exec_sql_blocking(&sql).map_err(MigrateError::Execute)?;
165 Ok(())
166 })
167 }
168
169 fn ensure_migrations_table<'e>(
171 &'e mut self,
172 table_name: &'e str,
173 ) -> BoxFuture<'e, Result<(), MigrateError>> {
174 let quoted = quoted_table_name(table_name);
175
176 let (schema, table) = split_table_name(table_name);
178 let schema_condition = if schema.is_empty() {
179 "TABLE_SCHEMA = 'dbo'".to_owned()
180 } else {
181 format!("TABLE_SCHEMA = N'{}'", escape_sql_string(schema))
182 };
183
184 let create_sql = format!(
185 "IF NOT EXISTS ( \
186 SELECT * FROM INFORMATION_SCHEMA.TABLES \
187 WHERE TABLE_NAME = N'{table}' AND {schema_condition} \
188 ) \
189 CREATE TABLE {quoted} ( \
190 version BIGINT NOT NULL PRIMARY KEY, \
191 description NVARCHAR(MAX) NOT NULL, \
192 migration_type NVARCHAR(20) NOT NULL, \
193 sql NVARCHAR(MAX) NOT NULL, \
194 checksum VARBINARY(8000) NOT NULL, \
195 executed_at DATETIME2 NOT NULL DEFAULT GETUTCDATE(), \
196 no_tx BIT NOT NULL DEFAULT 0 \
197 )",
198 table = escape_sql_string(table),
199 schema_condition = schema_condition,
200 quoted = quoted,
201 );
202
203 Box::pin(async move {
204 self.exec_sql_blocking(&create_sql).map_err(MigrateError::Execute)?;
205 Ok(())
206 })
207 }
208
209 fn dirty_version<'e>(
212 &'e mut self,
213 _table_name: &'e str,
214 ) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
215 Box::pin(async move { Ok(None) })
216 }
217
218 fn list_applied_migrations<'e>(
220 &'e mut self,
221 table_name: &'e str,
222 ) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
223 let quoted = quoted_table_name(table_name);
224 let sql = format!(
225 "SELECT version, checksum FROM {quoted} ORDER BY version",
226 );
227
228 Box::pin(async move {
229 let rows = self.list_migrations_blocking(&sql)?;
230 let migrations = rows
231 .into_iter()
232 .map(|(version, checksum)| AppliedMigration {
233 version,
234 checksum: checksum.into(),
235 })
236 .collect();
237 Ok(migrations)
238 })
239 }
240
241 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
243 Box::pin(async move {
244 self.exec_sql_blocking(
245 "EXEC sp_getapplock \
246 @Resource = N'sqlx_migration_lock', \
247 @LockMode = 'Exclusive', \
248 @LockOwner = 'Session'",
249 )
250 .map_err(MigrateError::Execute)?;
251 Ok(())
252 })
253 }
254
255 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
257 Box::pin(async move {
258 self.exec_sql_blocking(
259 "EXEC sp_releaseapplock \
260 @Resource = N'sqlx_migration_lock', \
261 @LockOwner = 'Session'",
262 )
263 .map_err(MigrateError::Execute)?;
264 Ok(())
265 })
266 }
267
268 fn apply<'e>(
271 &'e mut self,
272 _table_name: &'e str,
273 migration: &'e Migration,
274 ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
275 let quoted = quoted_table_name(_table_name);
276 let sql = migration.sql.as_str().to_owned();
277 let version = migration.version;
278 let description = migration.description.to_string();
279 let migration_type = format!("{:?}", migration.migration_type);
280 let checksum = migration.checksum.to_vec();
281 let no_tx = migration.no_tx;
282
283 let insert_sql = format!(
284 "INSERT INTO {quoted} \
285 (version, description, migration_type, sql, checksum, no_tx) \
286 VALUES ({version}, N'{desc}', N'{mt}', N'{sql_text}', {chk}, {ntx})",
287 quoted = quoted,
288 version = version,
289 desc = escape_sql_string(&description),
290 mt = escape_sql_string(&migration_type),
291 sql_text = escape_sql_string(migration.sql.as_str()),
292 chk = format_hex(&checksum),
293 ntx = if no_tx { 1 } else { 0 },
294 );
295
296 Box::pin(async move {
297 self.apply_migration_blocking(&sql, &insert_sql, version, no_tx)
298 .map_err(|e| MigrateError::ExecuteMigration(e, version))
299 })
300 }
301
302 fn revert<'e>(
305 &'e mut self,
306 _table_name: &'e str,
307 migration: &'e Migration,
308 ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
309 let quoted = quoted_table_name(_table_name);
310 let sql = migration.sql.as_str().to_owned();
311 let version = migration.version;
312 let no_tx = migration.no_tx;
313
314 let delete_sql = format!(
315 "DELETE FROM {quoted} WHERE version = {version}",
316 quoted = quoted,
317 version = version,
318 );
319
320 Box::pin(async move {
321 self.revert_migration_blocking(&sql, &delete_sql, version, no_tx)
322 .map_err(|e| MigrateError::ExecuteMigration(e, version))
323 })
324 }
325
326 fn skip<'e>(
328 &'e mut self,
329 _table_name: &'e str,
330 _migration: &'e Migration,
331 ) -> BoxFuture<'e, Result<(), MigrateError>> {
332 let quoted = quoted_table_name(_table_name);
333 let version = _migration.version;
334 let description = _migration.description.to_string();
335 let migration_type = format!("{:?}", _migration.migration_type);
336 let checksum = _migration.checksum.to_vec();
337 let no_tx = _migration.no_tx;
338
339 Box::pin(async move {
340 let insert_sql = format!(
341 "INSERT INTO {quoted} \
342 (version, description, migration_type, sql, checksum, no_tx) \
343 VALUES ({version}, N'{desc}', N'{mt}', N'', {chk}, {ntx})",
344 quoted = quoted,
345 version = version,
346 desc = escape_sql_string(&description),
347 mt = escape_sql_string(&migration_type),
348 chk = format_hex(&checksum),
349 ntx = if no_tx { 1 } else { 0 },
350 );
351 self.exec_sql_blocking(&insert_sql)
352 .map_err(|e| MigrateError::ExecuteMigration(e, version))
353 })
354 }
355}