sqlx_migrate/db/
sqlite.rs

1use async_trait::async_trait;
2use sqlx::{query, query_as};
3use std::{borrow::Cow, time::Duration};
4use time::OffsetDateTime;
5
6use super::AppliedMigration;
7
8#[async_trait(?Send)]
9impl super::Migrations for sqlx::SqliteConnection {
10    async fn ensure_migrations_table(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
11        query(&format!(
12            r#"
13                CREATE TABLE IF NOT EXISTS {} (
14                    version BIGINT PRIMARY KEY,
15                    name TEXT NOT NULL,
16                    applied_on INTEGER NOT NULL,
17                    checksum BLOB NOT NULL,
18                    execution_time BIGINT NOT NULL
19                );
20                "#,
21            table_name
22        ))
23        .execute(self)
24        .await?;
25
26        Ok(())
27    }
28
29    async fn lock(&mut self) -> Result<(), sqlx::Error> {
30        Ok(())
31    }
32
33    async fn unlock(&mut self) -> Result<(), sqlx::Error> {
34        Ok(())
35    }
36
37    async fn list_migrations(
38        &mut self,
39        table_name: &str,
40    ) -> Result<Vec<super::AppliedMigration<'static>>, sqlx::Error> {
41        let rows: Vec<(i64, String, Vec<u8>, i64)> = query_as(&format!(
42            r#"
43            SELECT
44                version,
45                name,
46                checksum,
47                execution_time
48            FROM
49                {}
50            ORDER BY version
51            "#,
52            table_name
53        ))
54        .fetch_all(self)
55        .await?;
56
57        Ok(rows
58            .into_iter()
59            .map(|row| AppliedMigration {
60                version: row.0 as u64,
61                name: Cow::Owned(row.1),
62                checksum: Cow::Owned(row.2),
63                execution_time: Duration::from_nanos(row.3 as _),
64            })
65            .collect())
66    }
67
68    async fn add_migration(
69        &mut self,
70        table_name: &str,
71        migration: super::AppliedMigration<'static>,
72    ) -> Result<(), sqlx::Error> {
73        query(&format!(
74            r#"
75                INSERT INTO {} ( version, name, checksum, execution_time, applied_on )
76                VALUES ( $1, $2, $3, $4, $5 )
77            "#,
78            table_name
79        ))
80        .bind(migration.version as i64)
81        .bind(&*migration.name.clone())
82        .bind(&*migration.checksum.clone())
83        .bind(migration.execution_time.as_nanos() as i64)
84        .bind(OffsetDateTime::now_utc().unix_timestamp())
85        .execute(self)
86        .await?;
87
88        Ok(())
89    }
90
91    async fn remove_migration(
92        &mut self,
93        table_name: &str,
94        version: u64,
95    ) -> Result<(), sqlx::Error> {
96        query(&format!(r#"DELETE FROM {} WHERE version = $1"#, table_name))
97            .bind(version as i64)
98            .execute(self)
99            .await?;
100
101        Ok(())
102    }
103
104    async fn clear_migrations(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
105        query(&format!("TRUNCATE {}", table_name))
106            .execute(self)
107            .await?;
108        Ok(())
109    }
110}