sqlx_migrate/db/
postgres.rs

1use std::{borrow::Cow, time::Duration};
2
3use async_trait::async_trait;
4use sqlx::{query, query_as, query_scalar, PgConnection};
5
6use super::AppliedMigration;
7
8#[async_trait(?Send)]
9impl super::Migrations for sqlx::PgConnection {
10    async fn ensure_migrations_table(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
11        query(&format!(
12            r#"
13                CREATE TABLE IF NOT EXISTS {} (
14                    version BIGINT PRIMARY KEY,
15                    name TEXT NOT NULL,
16                    applied_on TIMESTAMPTZ NOT NULL DEFAULT now(),
17                    checksum BYTEA NOT NULL,
18                    execution_time BIGINT NOT NULL
19                );
20                "#,
21            table_name
22        ))
23        .execute(self)
24        .await?;
25
26        Ok(())
27    }
28
29    async fn lock(&mut self) -> Result<(), sqlx::Error> {
30        let database_name = current_database(self).await?;
31        let lock_id = generate_lock_id(&database_name);
32
33        // create an application lock over the database
34        // this function will not return until the lock is acquired
35
36        // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
37        // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
38
39        // language=SQL
40        let _ = query("SELECT pg_advisory_lock($1)")
41            .bind(lock_id)
42            .execute(self)
43            .await?;
44
45        Ok(())
46    }
47
48    async fn unlock(&mut self) -> Result<(), sqlx::Error> {
49        let database_name = current_database(self).await?;
50        let lock_id = generate_lock_id(&database_name);
51
52        // language=SQL
53        let _ = query("SELECT pg_advisory_unlock($1)")
54            .bind(lock_id)
55            .execute(self)
56            .await?;
57
58        Ok(())
59    }
60
61    async fn list_migrations(
62        &mut self,
63        table_name: &str,
64    ) -> Result<Vec<super::AppliedMigration<'static>>, sqlx::Error> {
65        let rows: Vec<(i64, String, Vec<u8>, i64)> = query_as(&format!(
66            r#"
67            SELECT
68                version,
69                name,
70                checksum,
71                execution_time
72            FROM
73                {}
74            ORDER BY version
75            "#,
76            table_name
77        ))
78        .fetch_all(self)
79        .await?;
80
81        Ok(rows
82            .into_iter()
83            .map(|row| AppliedMigration {
84                version: row.0 as u64,
85                name: Cow::Owned(row.1),
86                checksum: Cow::Owned(row.2),
87                execution_time: Duration::from_nanos(row.3 as _),
88            })
89            .collect())
90    }
91
92    async fn add_migration(
93        &mut self,
94        table_name: &str,
95        migration: super::AppliedMigration<'static>,
96    ) -> Result<(), sqlx::Error> {
97        query(&format!(
98            r#"
99                INSERT INTO {} ( version, name, checksum, execution_time )
100                VALUES ( $1, $2, $3, $4 )
101            "#,
102            table_name
103        ))
104        .bind(migration.version as i64)
105        .bind(&*migration.name.clone())
106        .bind(&*migration.checksum.clone())
107        .bind(migration.execution_time.as_nanos() as i64)
108        .execute(self)
109        .await?;
110
111        Ok(())
112    }
113
114    async fn remove_migration(
115        &mut self,
116        table_name: &str,
117        version: u64,
118    ) -> Result<(), sqlx::Error> {
119        query(&format!(r#"DELETE FROM {} WHERE version = $1"#, table_name))
120            .bind(version as i64)
121            .execute(self)
122            .await?;
123
124        Ok(())
125    }
126
127    async fn clear_migrations(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
128        query(&format!("TRUNCATE {}", table_name))
129            .execute(self)
130            .await?;
131        Ok(())
132    }
133}
134
135async fn current_database(conn: &mut PgConnection) -> Result<String, sqlx::Error> {
136    query_scalar("SELECT current_database()")
137        .fetch_one(conn)
138        .await
139}
140
141// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
142fn generate_lock_id(database_name: &str) -> i64 {
143    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
144    // 0x20871d5f chosen by fair dice roll
145    0x20871d5f * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
146}