mod executor;
mod utils;
pub use executor::PostgresExecutor;
use crate::application::{Executor, ExecutorError};
use crate::domain::Asset;
use async_trait::async_trait;
use std::time::Instant;
use super::utils::{fmt_duration, SLOW_THRESHOLD};
use utils::run_in_transaction;
#[async_trait]
impl Executor for PostgresExecutor {
async fn run(&self, asset: &Asset) -> Result<(), ExecutorError> {
tracing::debug!(asset = %asset, kind = %asset.kind(), "starting");
let start = Instant::now();
let mut tx = self.pool().begin().await.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})?;
let outcome = run_in_transaction(&mut *tx, asset).await;
let rows_affected = match outcome {
Ok(rows) => {
tx.commit().await.map_err(|e| ExecutorError::Run {
asset: asset.to_string(),
message: e.to_string(),
})?;
rows
}
Err(e) => {
if let Err(rollback_err) = tx.rollback().await {
tracing::warn!(
asset = %asset,
error = %rollback_err,
"rollback failed — lock may be held until session ends"
);
}
return Err(e);
}
};
let elapsed = start.elapsed();
if elapsed >= SLOW_THRESHOLD {
tracing::warn!(
asset = %asset,
kind = %asset.kind(),
rows_affected,
elapsed = %fmt_duration(elapsed),
threshold = %fmt_duration(SLOW_THRESHOLD),
"slow query"
);
} else {
tracing::info!(
asset = %asset,
kind = %asset.kind(),
rows_affected,
elapsed = %fmt_duration(elapsed),
"done"
);
}
Ok(())
}
}