hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use sqlx::PgConnection;
use crate::{Asset, AssetKind, ExecutorError};

pub async fn run_in_transaction(
    conn:  &mut PgConnection,
    asset: &Asset,
) -> Result<u64, ExecutorError> {
    let schema = asset.reference().schema();
    let name   = asset.reference().name();
    let source = asset.source().as_str();

    // advisory lock
    sqlx::query("SELECT pg_advisory_xact_lock($1)")
        .bind(lock_key(asset))
        .execute(&mut *conn)
        .await
        .map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })?;
    tracing::debug!(asset = %asset, "lock acquired");

    // save existing indexes
    let saved_indexes: Vec<String> = match asset.kind() {
        AssetKind::View => vec![],
        _               => save_indexes(&mut *conn, asset).await?,
    };

    // drop whatever exists
    drop_any(&mut *conn, asset).await?;

    // create
    let rows_affected = match asset.kind() {
        AssetKind::View => {
            let sql = format!(r#"CREATE VIEW "{schema}"."{name}" AS {source}"#);
            exec(&mut *conn, asset, &sql).await?
        }
        AssetKind::Table => {
            let sql = format!(r#"CREATE TABLE "{schema}"."{name}" AS {source}"#);
            exec(&mut *conn, asset, &sql).await?
        }
        AssetKind::MaterializedView => {
            let sql = format!(
                r#"CREATE MATERIALIZED VIEW "{schema}"."{name}" AS {source} WITH DATA"#
            );
            exec(&mut *conn, asset, &sql).await?
        }
    };

    // restore indexes
    let n = saved_indexes.len();
    for def in &saved_indexes {
        exec(&mut *conn, asset, def).await?;
    }
    if n > 0 {
        tracing::debug!(asset = %asset, n_indexes = n, "indexes restored");
    }

    Ok(rows_affected)
}


async fn exec(conn: &mut PgConnection, asset: &Asset, sql: &str) -> Result<u64, ExecutorError> {
    sqlx::query(sql)
        .execute(conn)
        .await
        .map(|r| r.rows_affected())
        .map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })
}

async fn drop_any(conn: &mut PgConnection, asset: &Asset) -> Result<(), ExecutorError> {
    let schema = asset.reference().schema();
    let name   = asset.reference().name();

    let kind: Option<String> = sqlx::query_scalar(
        r#"
        SELECT c.relkind::text
        FROM   pg_class     c
        JOIN   pg_namespace n ON n.oid = c.relnamespace
        WHERE  n.nspname = $1 AND c.relname = $2
        "#,
    )
        .bind(schema)
        .bind(name)
        .fetch_optional(&mut *conn)
        .await
        .map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })?;

    let drop_sql = match kind.as_deref() {
        Some("r") => format!(r#"DROP TABLE             "{schema}"."{name}""#),
        Some("v") => format!(r#"DROP VIEW              "{schema}"."{name}""#),
        Some("m") => format!(r#"DROP MATERIALIZED VIEW "{schema}"."{name}""#),
        _         => return Ok(()),
    };

    exec(conn, asset, &drop_sql).await?;
    Ok(())
}

async fn save_indexes(
    conn:  &mut PgConnection,
    asset: &Asset,
) -> Result<Vec<String>, ExecutorError> {
    let schema = asset.reference().schema();
    let name   = asset.reference().name();

    sqlx::query_scalar(
        r#"SELECT indexdef FROM pg_indexes WHERE schemaname = $1 AND tablename = $2"#,
    )
        .bind(schema)
        .bind(name)
        .fetch_all(&mut *conn)
        .await
        .map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })
}

fn lock_key(asset: &Asset) -> i64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};
    let mut h = DefaultHasher::new();
    asset.reference().schema().hash(&mut h);
    asset.reference().name().hash(&mut h);
    h.finish() as i64
}