fly/
db.rs

1use crate::error::Result;
2use crate::migration::{Migration, MigrationMeta};
3use crate::{config::Config, migration::MigrationWithMeta};
4use postgres::{Client, NoTls, Row};
5use std::time::SystemTime;
6use tracing::debug;
7
8static CREATE_MIGRATIONS_TABLE: &str = r#"
9  CREATE TABLE IF NOT EXISTS migrations (
10      id SERIAL PRIMARY KEY,
11      name TEXT NOT NULL UNIQUE,
12      up_sql TEXT NOT NULL,
13      down_sql TEXT NOT NULL,
14      created_at TIMESTAMP NOT NULL DEFAULT NOW()
15  );
16"#;
17
18pub struct Db {
19    client: Client,
20}
21
22impl Db {
23    pub fn connect(config: &Config) -> Result<Db> {
24        let client = Client::connect(&config.connection_string, NoTls)?;
25        Ok(Db { client })
26    }
27
28    pub fn create_migrations_table(&mut self) -> Result<()> {
29        self.client.batch_execute(CREATE_MIGRATIONS_TABLE)?;
30        Ok(())
31    }
32
33    pub fn list(&mut self) -> Result<Vec<MigrationWithMeta>> {
34        let rows = self.client.query("SELECT * FROM migrations", &[])?;
35        let migrations = rows
36            .iter()
37            .map(parse_migration_with_meta)
38            .collect::<Result<_>>()?;
39        Ok(migrations)
40    }
41
42    /// Panics if the INSERT statement does not return 1 row.
43    pub fn run(&mut self, migration: &Migration) -> Result<MigrationWithMeta> {
44        debug!("inserting migration {:?}", migration);
45        let mut transaction = self.client.transaction()?;
46        transaction.batch_execute(&migration.up_sql)?;
47        let rows = transaction.query(
48            "INSERT INTO migrations (name, up_sql, down_sql) VALUES ($1, $2, $3) RETURNING *",
49            &[&migration.name, &migration.up_sql, &migration.down_sql],
50        )?;
51        let [ref row] = rows[..] else {
52            panic!("postgres inserted {} elements, expected 1", rows.len());
53        };
54        let migration = parse_migration_with_meta(row)?;
55
56        transaction.commit()?;
57        Ok(migration)
58    }
59
60    pub fn rollback_migration(&mut self, migration: &Migration) -> Result<()> {
61        debug!("rolling back migration {:?}", migration);
62        let mut transaction = self.client.transaction()?;
63        transaction.batch_execute(&migration.down_sql)?;
64        transaction.execute("DELETE FROM migrations WHERE name = $1", &[&migration.name])?;
65        transaction.commit()?;
66        Ok(())
67    }
68}
69
70fn parse_migration_with_meta(row: &Row) -> Result<MigrationWithMeta> {
71    let up_sql = row.try_get::<_, String>("up_sql")?;
72    let down_sql = row.try_get::<_, String>("down_sql")?;
73    let name = row.try_get::<_, String>("name")?;
74
75    let migration = Migration {
76        up_sql,
77        down_sql,
78        name,
79    };
80
81    let id = row.try_get::<_, i32>("id")?;
82    let created_at = row.try_get::<_, SystemTime>("created_at")?;
83
84    let meta = MigrationMeta { id, created_at };
85
86    Ok(MigrationWithMeta { migration, meta })
87}