sqlx_core_guts/sqlite/
migrate.rs1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::query::query;
8use crate::query_as::query_as;
9use crate::query_scalar::query_scalar;
10use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection};
11use futures_core::future::BoxFuture;
12use sqlx_rt::fs;
13use std::str::FromStr;
14use std::time::Duration;
15use std::time::Instant;
16
17impl MigrateDatabase for Sqlite {
18 fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
19 Box::pin(async move {
20 let _ = SqliteConnectOptions::from_str(url)?
22 .create_if_missing(true)
23 .connect()
24 .await?;
25
26 Ok(())
27 })
28 }
29
30 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
31 Box::pin(async move {
32 let options = SqliteConnectOptions::from_str(url)?;
33
34 if options.in_memory {
35 Ok(true)
36 } else {
37 Ok(options.filename.exists())
38 }
39 })
40 }
41
42 fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
43 Box::pin(async move {
44 let options = SqliteConnectOptions::from_str(url)?;
45
46 if !options.in_memory {
47 fs::remove_file(&*options.filename).await?;
48 }
49
50 Ok(())
51 })
52 }
53}
54
55impl Migrate for SqliteConnection {
56 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
57 Box::pin(async move {
58 self.execute(
60 r#"
61CREATE TABLE IF NOT EXISTS _sqlx_migrations (
62 version BIGINT PRIMARY KEY,
63 description TEXT NOT NULL,
64 installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
65 success BOOLEAN NOT NULL,
66 checksum BLOB NOT NULL,
67 execution_time BIGINT NOT NULL
68);
69 "#,
70 )
71 .await?;
72
73 Ok(())
74 })
75 }
76
77 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
78 Box::pin(async move {
79 let row = query_as(
81 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
82 )
83 .fetch_optional(self)
84 .await?;
85
86 Ok(row)
87 })
88 }
89
90 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
91 Box::pin(async move {
92 let row: Option<(i64,)> = query_as(
94 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
95 )
96 .fetch_optional(self)
97 .await?;
98
99 Ok(row.map(|r| r.0))
100 })
101 }
102
103 fn list_applied_migrations(
104 &mut self,
105 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
106 Box::pin(async move {
107 let rows: Vec<(i64, Vec<u8>)> =
109 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
110 .fetch_all(self)
111 .await?;
112
113 let migrations = rows
114 .into_iter()
115 .map(|(version, checksum)| AppliedMigration {
116 version,
117 checksum: checksum.into(),
118 })
119 .collect();
120
121 Ok(migrations)
122 })
123 }
124
125 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
126 Box::pin(async move { Ok(()) })
127 }
128
129 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
130 Box::pin(async move { Ok(()) })
131 }
132
133 fn validate<'e: 'm, 'm>(
134 &'e mut self,
135 migration: &'m Migration,
136 ) -> BoxFuture<'m, Result<(), MigrateError>> {
137 Box::pin(async move {
138 let checksum: Option<Vec<u8>> =
140 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?1")
141 .bind(migration.version)
142 .fetch_optional(self)
143 .await?;
144
145 if let Some(checksum) = checksum {
146 if checksum == &*migration.checksum {
147 Ok(())
148 } else {
149 Err(MigrateError::VersionMismatch(migration.version))
150 }
151 } else {
152 Err(MigrateError::VersionMissing(migration.version))
153 }
154 })
155 }
156
157 fn apply<'e: 'm, 'm>(
158 &'e mut self,
159 migration: &'m Migration,
160 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
161 Box::pin(async move {
162 let mut tx = self.begin().await?;
163 let start = Instant::now();
164
165 let _ = tx.execute(&*migration.sql).await?;
166
167 tx.commit().await?;
168
169 let elapsed = start.elapsed();
170
171 let _ = query(
173 r#"
174 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
175 VALUES ( ?1, ?2, TRUE, ?3, ?4 )
176 "#,
177 )
178 .bind(migration.version)
179 .bind(&*migration.description)
180 .bind(&*migration.checksum)
181 .bind(elapsed.as_nanos() as i64)
182 .execute(self)
183 .await?;
184
185 Ok(elapsed)
186 })
187 }
188
189 fn revert<'e: 'm, 'm>(
190 &'e mut self,
191 migration: &'m Migration,
192 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
193 Box::pin(async move {
194 let mut tx = self.begin().await?;
195 let start = Instant::now();
196
197 let _ = tx.execute(&*migration.sql).await?;
198
199 tx.commit().await?;
200
201 let elapsed = start.elapsed();
202
203 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
205 .bind(migration.version)
206 .execute(self)
207 .await?;
208
209 Ok(elapsed)
210 })
211 }
212}