sqlx_migrate/db/
postgres.rs1use std::{borrow::Cow, time::Duration};
2
3use async_trait::async_trait;
4use sqlx::{query, query_as, query_scalar, PgConnection};
5
6use super::AppliedMigration;
7
8#[async_trait(?Send)]
9impl super::Migrations for sqlx::PgConnection {
10 async fn ensure_migrations_table(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
11 query(&format!(
12 r#"
13 CREATE TABLE IF NOT EXISTS {} (
14 version BIGINT PRIMARY KEY,
15 name TEXT NOT NULL,
16 applied_on TIMESTAMPTZ NOT NULL DEFAULT now(),
17 checksum BYTEA NOT NULL,
18 execution_time BIGINT NOT NULL
19 );
20 "#,
21 table_name
22 ))
23 .execute(self)
24 .await?;
25
26 Ok(())
27 }
28
29 async fn lock(&mut self) -> Result<(), sqlx::Error> {
30 let database_name = current_database(self).await?;
31 let lock_id = generate_lock_id(&database_name);
32
33 let _ = query("SELECT pg_advisory_lock($1)")
41 .bind(lock_id)
42 .execute(self)
43 .await?;
44
45 Ok(())
46 }
47
48 async fn unlock(&mut self) -> Result<(), sqlx::Error> {
49 let database_name = current_database(self).await?;
50 let lock_id = generate_lock_id(&database_name);
51
52 let _ = query("SELECT pg_advisory_unlock($1)")
54 .bind(lock_id)
55 .execute(self)
56 .await?;
57
58 Ok(())
59 }
60
61 async fn list_migrations(
62 &mut self,
63 table_name: &str,
64 ) -> Result<Vec<super::AppliedMigration<'static>>, sqlx::Error> {
65 let rows: Vec<(i64, String, Vec<u8>, i64)> = query_as(&format!(
66 r#"
67 SELECT
68 version,
69 name,
70 checksum,
71 execution_time
72 FROM
73 {}
74 ORDER BY version
75 "#,
76 table_name
77 ))
78 .fetch_all(self)
79 .await?;
80
81 Ok(rows
82 .into_iter()
83 .map(|row| AppliedMigration {
84 version: row.0 as u64,
85 name: Cow::Owned(row.1),
86 checksum: Cow::Owned(row.2),
87 execution_time: Duration::from_nanos(row.3 as _),
88 })
89 .collect())
90 }
91
92 async fn add_migration(
93 &mut self,
94 table_name: &str,
95 migration: super::AppliedMigration<'static>,
96 ) -> Result<(), sqlx::Error> {
97 query(&format!(
98 r#"
99 INSERT INTO {} ( version, name, checksum, execution_time )
100 VALUES ( $1, $2, $3, $4 )
101 "#,
102 table_name
103 ))
104 .bind(migration.version as i64)
105 .bind(&*migration.name.clone())
106 .bind(&*migration.checksum.clone())
107 .bind(migration.execution_time.as_nanos() as i64)
108 .execute(self)
109 .await?;
110
111 Ok(())
112 }
113
114 async fn remove_migration(
115 &mut self,
116 table_name: &str,
117 version: u64,
118 ) -> Result<(), sqlx::Error> {
119 query(&format!(r#"DELETE FROM {} WHERE version = $1"#, table_name))
120 .bind(version as i64)
121 .execute(self)
122 .await?;
123
124 Ok(())
125 }
126
127 async fn clear_migrations(&mut self, table_name: &str) -> Result<(), sqlx::Error> {
128 query(&format!("TRUNCATE {}", table_name))
129 .execute(self)
130 .await?;
131 Ok(())
132 }
133}
134
135async fn current_database(conn: &mut PgConnection) -> Result<String, sqlx::Error> {
136 query_scalar("SELECT current_database()")
137 .fetch_one(conn)
138 .await
139}
140
141fn generate_lock_id(database_name: &str) -> i64 {
143 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
144 0x20871d5f * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
146}