hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
mod executor;
mod utils;

pub use executor::PostgresExecutor;

use crate::application::{Executor, ExecutorError};
use crate::domain::Asset;
use async_trait::async_trait;
use std::time::Instant;
use super::utils::{fmt_duration, SLOW_THRESHOLD};
use utils::run_in_transaction;

#[async_trait]
impl Executor for PostgresExecutor {

    async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
        tracing::debug!(asset = %asset, kind = %asset.kind(), "starting");

        let start = Instant::now();

        let mut tx = self.pool().begin().await.map_err(|e| ExecutorError::Run {
            asset:   asset.to_string(),
            message: e.to_string(),
        })?;

        // All work happens inside the transaction.
        // On any error we explicitly rollback to release the advisory lock immediately,
        // without relying on the implicit async Drop.
        let outcome = run_in_transaction(&mut *tx, asset).await;

        let rows_affected = match outcome {
            Ok(rows) => {
                tx.commit().await.map_err(|e| ExecutorError::Run {
                    asset:   asset.to_string(),
                    message: e.to_string(),
                })?;
                rows
            }
            Err(e) => {
                // Explicit rollback = advisory lock released immediately.
                if let Err(rollback_err) = tx.rollback().await {
                    tracing::warn!(
                        asset  = %asset,
                        error  = %rollback_err,
                        "rollback failed — lock may be held until session ends"
                    );
                }
                return Err(e);
            }
        };

        let elapsed = start.elapsed();
        if elapsed >= SLOW_THRESHOLD {
            tracing::warn!(
                asset         = %asset,
                kind          = %asset.kind(),
                rows_affected,
                elapsed       = %fmt_duration(elapsed),
                threshold     = %fmt_duration(SLOW_THRESHOLD),
                "slow query"
            );
        } else {
            tracing::info!(
                asset         = %asset,
                kind          = %asset.kind(),
                rows_affected,
                elapsed       = %fmt_duration(elapsed),
                "done"
            );
        }

        Ok(())
    }
}