use sqlx::PgConnection;
use crate::{Asset, AssetKind, ExecutorError};
pub async fn run_in_transaction(
conn: &mut PgConnection,
asset: &Asset,
) -> Result<u64, ExecutorError> {
let schema = asset.reference().schema();
let name = asset.reference().name();
let source = asset.source().as_str();
sqlx::query("SELECT pg_advisory_xact_lock($1)")
.bind(lock_key(asset))
.execute(&mut *conn)
.await
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})?;
tracing::debug!(asset = %asset, "lock acquired");
let saved_indexes: Vec<String> = match asset.kind() {
AssetKind::View => vec![],
_ => save_indexes(&mut *conn, asset).await?,
};
drop_any(&mut *conn, asset).await?;
let rows_affected = match asset.kind() {
AssetKind::View => {
let sql = format!(r#"CREATE VIEW "{schema}"."{name}" AS {source}"#);
exec(&mut *conn, asset, &sql).await?
}
AssetKind::Table => {
let sql = format!(r#"CREATE TABLE "{schema}"."{name}" AS {source}"#);
exec(&mut *conn, asset, &sql).await?
}
AssetKind::MaterializedView => {
let sql = format!(
r#"CREATE MATERIALIZED VIEW "{schema}"."{name}" AS {source} WITH DATA"#
);
exec(&mut *conn, asset, &sql).await?
}
};
let n = saved_indexes.len();
for def in &saved_indexes {
exec(&mut *conn, asset, def).await?;
}
if n > 0 {
tracing::debug!(asset = %asset, n_indexes = n, "indexes restored");
}
Ok(rows_affected)
}
async fn exec(conn: &mut PgConnection, asset: &Asset, sql: &str) -> Result<u64, ExecutorError> {
sqlx::query(sql)
.execute(conn)
.await
.map(|r| r.rows_affected())
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})
}
async fn drop_any(conn: &mut PgConnection, asset: &Asset) -> Result<(), ExecutorError> {
let schema = asset.reference().schema();
let name = asset.reference().name();
let kind: Option<String> = sqlx::query_scalar(
r#"
SELECT c.relkind::text
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1 AND c.relname = $2
"#,
)
.bind(schema)
.bind(name)
.fetch_optional(&mut *conn)
.await
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})?;
let drop_sql = match kind.as_deref() {
Some("r") => format!(r#"DROP TABLE "{schema}"."{name}""#),
Some("v") => format!(r#"DROP VIEW "{schema}"."{name}""#),
Some("m") => format!(r#"DROP MATERIALIZED VIEW "{schema}"."{name}""#),
_ => return Ok(()),
};
exec(conn, asset, &drop_sql).await?;
Ok(())
}
async fn save_indexes(
conn: &mut PgConnection,
asset: &Asset,
) -> Result<Vec<String>, ExecutorError> {
let schema = asset.reference().schema();
let name = asset.reference().name();
sqlx::query_scalar(
r#"SELECT indexdef FROM pg_indexes WHERE schemaname = $1 AND tablename = $2"#,
)
.bind(schema)
.bind(name)
.fetch_all(&mut *conn)
.await
.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})
}
fn lock_key(asset: &Asset) -> i64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
asset.reference().schema().hash(&mut h);
asset.reference().name().hash(&mut h);
h.finish() as i64
}