sqlx_core_oldapi/sqlite/
migrate.rs1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Result;
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::query::query;
8use crate::query_as::query_as;
9use crate::query_scalar::query_scalar;
10use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
11use futures_core::future::BoxFuture;
12use sqlx_rt::fs;
13use std::str::FromStr;
14use std::sync::atomic::Ordering;
15use std::time::Duration;
16use std::time::Instant;
17
18impl MigrateDatabase for Sqlite {
19 fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
20 Box::pin(async move {
21 let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true);
22
23 if super::CREATE_DB_WAL.load(Ordering::Acquire) {
26 opts = opts.journal_mode(SqliteJournalMode::Wal);
27 }
28
29 opts.connect()
31 .await?
32 .close()
34 .await?;
35
36 Ok(())
37 })
38 }
39
40 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
41 Box::pin(async move {
42 let options = SqliteConnectOptions::from_str(url)?;
43
44 if options.in_memory {
45 Ok(true)
46 } else {
47 Ok(options.filename.exists())
48 }
49 })
50 }
51
52 fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
53 Box::pin(async move {
54 let options = SqliteConnectOptions::from_str(url)?;
55
56 if !options.in_memory {
57 fs::remove_file(&*options.filename).await?;
58 }
59
60 Ok(())
61 })
62 }
63}
64
65impl Migrate for SqliteConnection {
66 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
67 Box::pin(async move {
68 self.execute(
70 r#"
71CREATE TABLE IF NOT EXISTS _sqlx_migrations (
72 version BIGINT PRIMARY KEY,
73 description TEXT NOT NULL,
74 installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
75 success BOOLEAN NOT NULL,
76 checksum BLOB NOT NULL,
77 execution_time BIGINT NOT NULL
78);
79 "#,
80 )
81 .await?;
82
83 Ok(())
84 })
85 }
86
87 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
88 Box::pin(async move {
89 let row = query_as(
91 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
92 )
93 .fetch_optional(self)
94 .await?;
95
96 Ok(row)
97 })
98 }
99
100 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
101 Box::pin(async move {
102 let row: Option<(i64,)> = query_as(
104 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
105 )
106 .fetch_optional(self)
107 .await?;
108
109 Ok(row.map(|r| r.0))
110 })
111 }
112
113 fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
114 Box::pin(async move {
115 let rows: Vec<(i64, Vec<u8>)> =
117 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
118 .fetch_all(self)
119 .await?;
120
121 let migrations = rows
122 .into_iter()
123 .map(|(version, checksum)| AppliedMigration {
124 version,
125 checksum: checksum.into(),
126 })
127 .collect();
128
129 Ok(migrations)
130 })
131 }
132
133 fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
134 Box::pin(async move { Ok(()) })
135 }
136
137 fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
138 Box::pin(async move { Ok(()) })
139 }
140
141 fn validate<'e: 'm, 'm>(
142 &'e mut self,
143 migration: &'m Migration,
144 ) -> BoxFuture<'m, MigrateResult<()>> {
145 Box::pin(async move {
146 let checksum: Option<Vec<u8>> =
148 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?1")
149 .bind(migration.version)
150 .fetch_optional(self)
151 .await
152 .map_err(MigrateError::AccessMigrationMetadata)?;
153
154 if let Some(checksum) = checksum {
155 if checksum == *migration.checksum {
156 Ok(())
157 } else {
158 Err(MigrateError::VersionMismatch(migration.version))
159 }
160 } else {
161 Err(MigrateError::VersionMissing(migration.version))
162 }
163 })
164 }
165
166 fn apply<'e: 'm, 'm>(
167 &'e mut self,
168 migration: &'m Migration,
169 ) -> BoxFuture<'m, Result<Duration>> {
170 Box::pin(async move {
171 let mut tx = self.begin().await?;
172 let start = Instant::now();
173
174 let _ = tx.execute(&*migration.sql).await?;
180
181 let _ = query(
183 r#"
184 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
185 VALUES ( ?1, ?2, TRUE, ?3, -1 )
186 "#,
187 )
188 .bind(migration.version)
189 .bind(&*migration.description)
190 .bind(&*migration.checksum)
191 .execute(&mut tx)
192 .await?;
193
194 tx.commit().await?;
195
196 let elapsed = start.elapsed();
201
202 let _ = query(
204 r#"
205 UPDATE _sqlx_migrations
206 SET execution_time = ?1
207 WHERE version = ?2
208 "#,
209 )
210 .bind(elapsed.as_nanos() as i64)
211 .bind(migration.version)
212 .execute(self)
213 .await?;
214
215 Ok(elapsed)
216 })
217 }
218
219 fn revert<'e: 'm, 'm>(
220 &'e mut self,
221 migration: &'m Migration,
222 ) -> BoxFuture<'m, Result<Duration>> {
223 Box::pin(async move {
224 let mut tx = self.begin().await?;
227 let start = Instant::now();
228
229 let _ = tx.execute(&*migration.sql).await?;
230
231 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
233 .bind(migration.version)
234 .execute(&mut tx)
235 .await?;
236
237 tx.commit().await?;
238
239 let elapsed = start.elapsed();
240
241 Ok(elapsed)
242 })
243 }
244}