sqlx_core_guts/sqlite/
migrate.rs

1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::query::query;
8use crate::query_as::query_as;
9use crate::query_scalar::query_scalar;
10use crate::sqlite::{Sqlite, SqliteConnectOptions, SqliteConnection};
11use futures_core::future::BoxFuture;
12use sqlx_rt::fs;
13use std::str::FromStr;
14use std::time::Duration;
15use std::time::Instant;
16
17impl MigrateDatabase for Sqlite {
18    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
19        Box::pin(async move {
20            // Opening a connection to sqlite creates the database
21            let _ = SqliteConnectOptions::from_str(url)?
22                .create_if_missing(true)
23                .connect()
24                .await?;
25
26            Ok(())
27        })
28    }
29
30    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
31        Box::pin(async move {
32            let options = SqliteConnectOptions::from_str(url)?;
33
34            if options.in_memory {
35                Ok(true)
36            } else {
37                Ok(options.filename.exists())
38            }
39        })
40    }
41
42    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            let options = SqliteConnectOptions::from_str(url)?;
45
46            if !options.in_memory {
47                fs::remove_file(&*options.filename).await?;
48            }
49
50            Ok(())
51        })
52    }
53}
54
55impl Migrate for SqliteConnection {
56    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
57        Box::pin(async move {
58            // language=SQLite
59            self.execute(
60                r#"
61CREATE TABLE IF NOT EXISTS _sqlx_migrations (
62    version BIGINT PRIMARY KEY,
63    description TEXT NOT NULL,
64    installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
65    success BOOLEAN NOT NULL,
66    checksum BLOB NOT NULL,
67    execution_time BIGINT NOT NULL
68);
69                "#,
70            )
71            .await?;
72
73            Ok(())
74        })
75    }
76
77    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
78        Box::pin(async move {
79            // language=SQLite
80            let row = query_as(
81                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
82            )
83            .fetch_optional(self)
84            .await?;
85
86            Ok(row)
87        })
88    }
89
90    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
91        Box::pin(async move {
92            // language=SQLite
93            let row: Option<(i64,)> = query_as(
94                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
95            )
96            .fetch_optional(self)
97            .await?;
98
99            Ok(row.map(|r| r.0))
100        })
101    }
102
103    fn list_applied_migrations(
104        &mut self,
105    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
106        Box::pin(async move {
107            // language=SQLite
108            let rows: Vec<(i64, Vec<u8>)> =
109                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
110                    .fetch_all(self)
111                    .await?;
112
113            let migrations = rows
114                .into_iter()
115                .map(|(version, checksum)| AppliedMigration {
116                    version,
117                    checksum: checksum.into(),
118                })
119                .collect();
120
121            Ok(migrations)
122        })
123    }
124
125    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
126        Box::pin(async move { Ok(()) })
127    }
128
129    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
130        Box::pin(async move { Ok(()) })
131    }
132
133    fn validate<'e: 'm, 'm>(
134        &'e mut self,
135        migration: &'m Migration,
136    ) -> BoxFuture<'m, Result<(), MigrateError>> {
137        Box::pin(async move {
138            // language=SQL
139            let checksum: Option<Vec<u8>> =
140                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?1")
141                    .bind(migration.version)
142                    .fetch_optional(self)
143                    .await?;
144
145            if let Some(checksum) = checksum {
146                if checksum == &*migration.checksum {
147                    Ok(())
148                } else {
149                    Err(MigrateError::VersionMismatch(migration.version))
150                }
151            } else {
152                Err(MigrateError::VersionMissing(migration.version))
153            }
154        })
155    }
156
157    fn apply<'e: 'm, 'm>(
158        &'e mut self,
159        migration: &'m Migration,
160    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
161        Box::pin(async move {
162            let mut tx = self.begin().await?;
163            let start = Instant::now();
164
165            let _ = tx.execute(&*migration.sql).await?;
166
167            tx.commit().await?;
168
169            let elapsed = start.elapsed();
170
171            // language=SQL
172            let _ = query(
173                r#"
174    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
175    VALUES ( ?1, ?2, TRUE, ?3, ?4 )
176                "#,
177            )
178            .bind(migration.version)
179            .bind(&*migration.description)
180            .bind(&*migration.checksum)
181            .bind(elapsed.as_nanos() as i64)
182            .execute(self)
183            .await?;
184
185            Ok(elapsed)
186        })
187    }
188
189    fn revert<'e: 'm, 'm>(
190        &'e mut self,
191        migration: &'m Migration,
192    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
193        Box::pin(async move {
194            let mut tx = self.begin().await?;
195            let start = Instant::now();
196
197            let _ = tx.execute(&*migration.sql).await?;
198
199            tx.commit().await?;
200
201            let elapsed = start.elapsed();
202
203            // language=SQL
204            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
205                .bind(migration.version)
206                .execute(self)
207                .await?;
208
209            Ok(elapsed)
210        })
211    }
212}