sqlx_core_oldapi/sqlite/
migrate.rs

1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Result;
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::query::query;
8use crate::query_as::query_as;
9use crate::query_scalar::query_scalar;
10use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
11use futures_core::future::BoxFuture;
12use sqlx_rt::fs;
13use std::str::FromStr;
14use std::sync::atomic::Ordering;
15use std::time::Duration;
16use std::time::Instant;
17
18impl MigrateDatabase for Sqlite {
19    fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
20        Box::pin(async move {
21            let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true);
22
23            // Since it doesn't make sense to include this flag in the connection URL,
24            // we just use an `AtomicBool` to pass it.
25            if super::CREATE_DB_WAL.load(Ordering::Acquire) {
26                opts = opts.journal_mode(SqliteJournalMode::Wal);
27            }
28
29            // Opening a connection to sqlite creates the database
30            opts.connect()
31                .await?
32                // Ensure WAL mode tempfiles are cleaned up
33                .close()
34                .await?;
35
36            Ok(())
37        })
38    }
39
40    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
41        Box::pin(async move {
42            let options = SqliteConnectOptions::from_str(url)?;
43
44            if options.in_memory {
45                Ok(true)
46            } else {
47                Ok(options.filename.exists())
48            }
49        })
50    }
51
52    fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
53        Box::pin(async move {
54            let options = SqliteConnectOptions::from_str(url)?;
55
56            if !options.in_memory {
57                fs::remove_file(&*options.filename).await?;
58            }
59
60            Ok(())
61        })
62    }
63}
64
65impl Migrate for SqliteConnection {
66    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
67        Box::pin(async move {
68            // language=SQLite
69            self.execute(
70                r#"
71CREATE TABLE IF NOT EXISTS _sqlx_migrations (
72    version BIGINT PRIMARY KEY,
73    description TEXT NOT NULL,
74    installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
75    success BOOLEAN NOT NULL,
76    checksum BLOB NOT NULL,
77    execution_time BIGINT NOT NULL
78);
79                "#,
80            )
81            .await?;
82
83            Ok(())
84        })
85    }
86
87    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
88        Box::pin(async move {
89            // language=SQLite
90            let row = query_as(
91                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
92            )
93            .fetch_optional(self)
94            .await?;
95
96            Ok(row)
97        })
98    }
99
100    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
101        Box::pin(async move {
102            // language=SQLite
103            let row: Option<(i64,)> = query_as(
104                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
105            )
106            .fetch_optional(self)
107            .await?;
108
109            Ok(row.map(|r| r.0))
110        })
111    }
112
113    fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
114        Box::pin(async move {
115            // language=SQLite
116            let rows: Vec<(i64, Vec<u8>)> =
117                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
118                    .fetch_all(self)
119                    .await?;
120
121            let migrations = rows
122                .into_iter()
123                .map(|(version, checksum)| AppliedMigration {
124                    version,
125                    checksum: checksum.into(),
126                })
127                .collect();
128
129            Ok(migrations)
130        })
131    }
132
133    fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
134        Box::pin(async move { Ok(()) })
135    }
136
137    fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
138        Box::pin(async move { Ok(()) })
139    }
140
141    fn validate<'e: 'm, 'm>(
142        &'e mut self,
143        migration: &'m Migration,
144    ) -> BoxFuture<'m, MigrateResult<()>> {
145        Box::pin(async move {
146            // language=SQL
147            let checksum: Option<Vec<u8>> =
148                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?1")
149                    .bind(migration.version)
150                    .fetch_optional(self)
151                    .await
152                    .map_err(MigrateError::AccessMigrationMetadata)?;
153
154            if let Some(checksum) = checksum {
155                if checksum == *migration.checksum {
156                    Ok(())
157                } else {
158                    Err(MigrateError::VersionMismatch(migration.version))
159                }
160            } else {
161                Err(MigrateError::VersionMissing(migration.version))
162            }
163        })
164    }
165
166    fn apply<'e: 'm, 'm>(
167        &'e mut self,
168        migration: &'m Migration,
169    ) -> BoxFuture<'m, Result<Duration>> {
170        Box::pin(async move {
171            let mut tx = self.begin().await?;
172            let start = Instant::now();
173
174            // Use a single transaction for the actual migration script and the essential bookeeping so we never
175            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
176            // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
177            // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
178            // and update it once the actual transaction completed.
179            let _ = tx.execute(&*migration.sql).await?;
180
181            // language=SQL
182            let _ = query(
183                r#"
184    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
185    VALUES ( ?1, ?2, TRUE, ?3, -1 )
186                "#,
187            )
188            .bind(migration.version)
189            .bind(&*migration.description)
190            .bind(&*migration.checksum)
191            .execute(&mut tx)
192            .await?;
193
194            tx.commit().await?;
195
196            // Update `elapsed_time`.
197            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
198            //       this small risk since this value is not super important.
199
200            let elapsed = start.elapsed();
201
202            // language=SQL
203            let _ = query(
204                r#"
205    UPDATE _sqlx_migrations
206    SET execution_time = ?1
207    WHERE version = ?2
208                "#,
209            )
210            .bind(elapsed.as_nanos() as i64)
211            .bind(migration.version)
212            .execute(self)
213            .await?;
214
215            Ok(elapsed)
216        })
217    }
218
219    fn revert<'e: 'm, 'm>(
220        &'e mut self,
221        migration: &'m Migration,
222    ) -> BoxFuture<'m, Result<Duration>> {
223        Box::pin(async move {
224            // Use a single transaction for the actual migration script and the essential bookeeping so we never
225            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
226            let mut tx = self.begin().await?;
227            let start = Instant::now();
228
229            let _ = tx.execute(&*migration.sql).await?;
230
231            // language=SQL
232            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
233                .bind(migration.version)
234                .execute(&mut tx)
235                .await?;
236
237            tx.commit().await?;
238
239            let elapsed = start.elapsed();
240
241            Ok(elapsed)
242        })
243    }
244}