1#[cfg(feature = "sea-orm")]
4pub mod sea;
5
6use std::path::{Path, PathBuf};
7
8use purwa_core::{AppConfig, PurwaConfigError};
9use sqlx::Executor;
10use sqlx::PgPool;
11use sqlx::migrate::{Migrate, MigrateError, Migrator};
12use thiserror::Error;
13
14#[derive(Debug, Error)]
16pub enum PurwaOrmError {
17 #[error(transparent)]
18 Sqlx(#[from] sqlx::Error),
19 #[error(transparent)]
20 Migrate(#[from] MigrateError),
21 #[error(transparent)]
22 Config(#[from] PurwaConfigError),
23 #[error(
24 "database URL is not set (purwa.toml [database].url, PURWA_DATABASE__URL, or DATABASE_URL)"
25 )]
26 DatabaseUrlMissing,
27}
28
29pub fn default_migrations_dir() -> PathBuf {
31 PathBuf::from("database/migrations")
32}
33
34pub async fn connect_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
36 sqlx::postgres::PgPoolOptions::new()
37 .max_connections(5)
38 .connect(database_url)
39 .await
40}
41
42pub fn database_url_from_config(cfg: &AppConfig) -> Result<String, PurwaOrmError> {
44 cfg.database_url().ok_or(PurwaOrmError::DatabaseUrlMissing)
45}
46
47pub async fn migrate_up(pool: &PgPool, dir: &Path) -> Result<(), PurwaOrmError> {
49 let m = Migrator::new(dir).await?;
50 m.run(pool).await?;
51 Ok(())
52}
53
54pub async fn migrate_undo(pool: &PgPool, dir: &Path, target: i64) -> Result<(), PurwaOrmError> {
56 let m = Migrator::new(dir).await?;
57 m.undo(pool, target).await?;
58 Ok(())
59}
60
61pub async fn migrate_rollback_one(pool: &PgPool, dir: &Path) -> Result<(), PurwaOrmError> {
66 let Some(target) = rollback_target(pool).await? else {
67 return Ok(());
68 };
69 migrate_undo(pool, dir, target).await
70}
71
72async fn rollback_target(pool: &PgPool) -> Result<Option<i64>, PurwaOrmError> {
73 let mut conn = pool.acquire().await?;
74 conn.ensure_migrations_table().await?;
75 if let Some(v) = conn.dirty_version().await? {
76 return Err(MigrateError::Dirty(v).into());
77 }
78 let applied = conn.list_applied_migrations().await?;
79 if applied.is_empty() {
80 return Ok(None);
81 }
82 let mut versions: Vec<i64> = applied.into_iter().map(|a| a.version).collect();
83 versions.sort();
84 let target = if versions.len() <= 1 {
85 0_i64
86 } else {
87 versions[versions.len() - 2]
88 };
89 Ok(Some(target))
90}
91
92pub async fn migrate_fresh(pool: &PgPool, dir: &Path) -> Result<(), PurwaOrmError> {
94 pool.execute("DROP SCHEMA IF EXISTS public CASCADE")
95 .await
96 .map_err(PurwaOrmError::Sqlx)?;
97 pool.execute("CREATE SCHEMA public")
98 .await
99 .map_err(PurwaOrmError::Sqlx)?;
100 pool.execute("GRANT ALL ON SCHEMA public TO PUBLIC")
101 .await
102 .map_err(PurwaOrmError::Sqlx)?;
103 migrate_up(pool, dir).await
104}