sqlx_migrate/db/
sqlite.rs1use async_trait::async_trait;
2use sqlx::{query, query_as};
3use std::{borrow::Cow, time::Duration};
4use time::OffsetDateTime;
5
6use super::AppliedMigration;
7
8#[async_trait(?Send)]
9impl super::Migrations for sqlx::SqliteConnection {
10 async fn ensure_migrations_table(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
11 query(&format!(
12 r#"
13 CREATE TABLE IF NOT EXISTS {} (
14 version BIGINT PRIMARY KEY,
15 name TEXT NOT NULL,
16 applied_on INTEGER NOT NULL,
17 checksum BLOB NOT NULL,
18 execution_time BIGINT NOT NULL
19 );
20 "#,
21 table_name
22 ))
23 .execute(self)
24 .await?;
25
26 Ok(())
27 }
28
29 async fn lock(&mut self) -> Result<(), sqlx::Error> {
30 Ok(())
31 }
32
33 async fn unlock(&mut self) -> Result<(), sqlx::Error> {
34 Ok(())
35 }
36
37 async fn list_migrations(
38 &mut self,
39 table_name: &str,
40 ) -> Result<Vec<super::AppliedMigration<'static>>, sqlx::Error> {
41 let rows: Vec<(i64, String, Vec<u8>, i64)> = query_as(&format!(
42 r#"
43 SELECT
44 version,
45 name,
46 checksum,
47 execution_time
48 FROM
49 {}
50 ORDER BY version
51 "#,
52 table_name
53 ))
54 .fetch_all(self)
55 .await?;
56
57 Ok(rows
58 .into_iter()
59 .map(|row| AppliedMigration {
60 version: row.0 as u64,
61 name: Cow::Owned(row.1),
62 checksum: Cow::Owned(row.2),
63 execution_time: Duration::from_nanos(row.3 as _),
64 })
65 .collect())
66 }
67
68 async fn add_migration(
69 &mut self,
70 table_name: &str,
71 migration: super::AppliedMigration<'static>,
72 ) -> Result<(), sqlx::Error> {
73 query(&format!(
74 r#"
75 INSERT INTO {} ( version, name, checksum, execution_time, applied_on )
76 VALUES ( $1, $2, $3, $4, $5 )
77 "#,
78 table_name
79 ))
80 .bind(migration.version as i64)
81 .bind(&*migration.name.clone())
82 .bind(&*migration.checksum.clone())
83 .bind(migration.execution_time.as_nanos() as i64)
84 .bind(OffsetDateTime::now_utc().unix_timestamp())
85 .execute(self)
86 .await?;
87
88 Ok(())
89 }
90
91 async fn remove_migration(
92 &mut self,
93 table_name: &str,
94 version: u64,
95 ) -> Result<(), sqlx::Error> {
96 query(&format!(r#"DELETE FROM {} WHERE version = $1"#, table_name))
97 .bind(version as i64)
98 .execute(self)
99 .await?;
100
101 Ok(())
102 }
103
104 async fn clear_migrations(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
105 query(&format!("TRUNCATE {}", table_name))
106 .execute(self)
107 .await?;
108 Ok(())
109 }
110}