use anyhow::{anyhow, Context};
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use std::{env, fs};
use tokio_postgres::{Client, NoTls};
pub async fn run_migrations(seed: bool) -> anyhow::Result<()> {
let client = connect_db().await?;
create_migrations_table(&client).await?;
let executed_migrations = get_executed_migrations(&client).await?;
let database_dir = ensure_database_dir()?;
let available_migrations = get_available_migrations(&database_dir)?;
for migration in available_migrations {
if !executed_migrations.contains(&migration.name) {
if let Ok(up) = fs::read_to_string(migration.path.join("up.sql")) {
client.batch_execute(&up).await?;
record_migration(&client, &migration.name).await?;
}
if seed {
if let Ok(content) = fs::read_to_string(migration.path.join("seeder.sql")) {
client.batch_execute(&content).await?;
}
}
}
}
Ok(())
}
pub(crate) fn ensure_database_dir() -> anyhow::Result<PathBuf> {
let migrations_dir = PathBuf::from("database");
if !migrations_dir.exists() {
create_dir_all(&migrations_dir)
.with_context(|| format!("Can't create the directory {}", migrations_dir.display()))?;
}
Ok(migrations_dir)
}
pub(crate) async fn connect_db() -> anyhow::Result<Client> {
let database_url = env::var("DATABASE_URL")
.context("DATABASE_URL must be set.")?;
let (client, connection) =
tokio_postgres::connect(&database_url, NoTls).await
.context("Can't connect to the postgres database.")?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Connection error: {}", e);
}
});
Ok(client)
}
pub(crate) async fn create_migrations_table(client: &Client) -> anyhow::Result<()> {
client.batch_execute(
"CREATE TABLE IF NOT EXISTS _migrations (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)"
).await?;
Ok(())
}
pub(crate) async fn get_executed_migrations(client: &Client) -> anyhow::Result<Vec<String>> {
let rows = client
.query("SELECT name FROM _migrations ORDER BY id", &[])
.await?;
let migrations = rows.iter()
.map(|row| row.get::<_, String>("name"))
.collect();
Ok(migrations)
}
pub(crate) struct Migration {
pub(crate) name: String,
pub(crate) path: PathBuf,
}
pub(crate) fn get_available_migrations(database_dir: &Path) -> anyhow::Result<Vec<Migration>> {
let mut migrations = Vec::new();
if !database_dir.exists() {
return Ok(migrations);
}
for entry in fs::read_dir(database_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let name = path.file_name()
.and_then(|s| s.to_str())
.ok_or_else(|| anyhow!("Invalid file name"))?
.to_string();
migrations.push(Migration { name, path });
}
}
migrations.sort_by(|a, b| a.name.cmp(&b.name));
Ok(migrations)
}
async fn record_migration(client: &Client, migration_name: &str) -> anyhow::Result<()> {
client
.execute(
"INSERT INTO _migrations (name) VALUES ($1)",
&[&migration_name]
)
.await?;
Ok(())
}