use serde_json::{Map, Value};
use crate::sql::sqlx::{self, postgres::PgRow, PgPool, Row};
#[derive(Debug, Clone)]
pub enum AuditSource {
System,
User { id: String },
Custom(String),
}
impl AuditSource {
#[must_use]
pub fn as_token(&self) -> String {
match self {
Self::System => "system".to_owned(),
Self::User { id } => format!("user:{id}"),
Self::Custom(s) => s.clone(),
}
}
}
impl Default for AuditSource {
fn default() -> Self {
Self::System
}
}
tokio::task_local! {
pub static AUDIT_SOURCE: AuditSource;
}
#[must_use]
pub fn current_source() -> AuditSource {
AUDIT_SOURCE
.try_with(Clone::clone)
.unwrap_or(AuditSource::System)
}
pub async fn with_source<F, T>(source: AuditSource, fut: F) -> T
where
F: std::future::Future<Output = T>,
{
AUDIT_SOURCE.scope(source, fut).await
}
#[derive(Debug, Clone)]
pub struct PendingEntry {
pub entity_table: &'static str,
pub entity_pk: String,
pub operation: AuditOp,
pub source: AuditSource,
pub changes: Value,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditOp {
Create,
Update,
Delete,
SoftDelete,
Restore,
}
impl AuditOp {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
Self::SoftDelete => "soft_delete",
Self::Restore => "restore",
}
}
}
pub async fn emit_one<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES ($1, $2, $3, $4, $5)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes)
.execute(executor)
.await?;
Ok(())
}
pub async fn emit_many<'c, E>(executor: E, entries: &[PendingEntry]) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
if entries.is_empty() {
return Ok(());
}
let mut sql = String::from(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES "#,
);
let mut bind_idx = 1usize;
for (i, _) in entries.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
use std::fmt::Write as _;
let _ = write!(
sql,
"(${}, ${}, ${}, ${}, ${})",
bind_idx,
bind_idx + 1,
bind_idx + 2,
bind_idx + 3,
bind_idx + 4,
);
bind_idx += 5;
}
let mut q = sqlx::query(&sql);
for entry in entries {
q = q
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes);
}
q.execute(executor).await?;
Ok(())
}
#[must_use]
pub fn diff_changes(before: &[(&str, Value)], after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, after_val) in after {
let before_val = before
.iter()
.find(|(n, _)| n == name)
.map(|(_, v)| v.clone())
.unwrap_or(Value::Null);
if &before_val != after_val {
let mut entry = Map::new();
entry.insert("before".into(), before_val);
entry.insert("after".into(), after_val.clone());
out.insert((*name).into(), Value::Object(entry));
}
}
Value::Object(out)
}
#[must_use]
pub fn snapshot_changes(after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, val) in after {
out.insert((*name).to_string(), val.clone());
}
Value::Object(out)
}
pub async fn fetch_for_entity(
pool: &PgPool,
entity_table: &str,
entity_pk: &str,
) -> Result<Vec<AuditEntry>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"SELECT "id", "entity_table", "entity_pk", "operation",
"source", "changes", "occurred_at"
FROM "rustango_audit_log"
WHERE "entity_table" = $1 AND "entity_pk" = $2
ORDER BY "occurred_at" DESC, "id" DESC"#,
)
.bind(entity_table)
.bind(entity_pk)
.fetch_all(pool)
.await?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
out.push(AuditEntry::from_row(&row)?);
}
Ok(out)
}
#[derive(Debug, Clone)]
pub struct AuditEntry {
pub id: i64,
pub entity_table: String,
pub entity_pk: String,
pub operation: String,
pub source: String,
pub changes: Value,
pub occurred_at: chrono::DateTime<chrono::Utc>,
}
impl AuditEntry {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
Ok(Self {
id: row.try_get("id")?,
entity_table: row.try_get("entity_table")?,
entity_pk: row.try_get("entity_pk")?,
operation: row.try_get("operation")?,
source: row.try_get("source")?,
changes: row.try_get("changes")?,
occurred_at: row.try_get("occurred_at")?,
})
}
}
pub const CREATE_TABLE_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_audit_log" (
"id" BIGSERIAL PRIMARY KEY,
"entity_table" TEXT NOT NULL,
"entity_pk" TEXT NOT NULL,
"operation" TEXT NOT NULL,
"source" TEXT NOT NULL,
"changes" JSONB NOT NULL,
"occurred_at" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "rustango_audit_log_entity_idx"
ON "rustango_audit_log" ("entity_table", "entity_pk");
CREATE INDEX IF NOT EXISTS "rustango_audit_log_occurred_idx"
ON "rustango_audit_log" ("occurred_at" DESC);
"#;
pub async fn cleanup_older_than(pool: &PgPool, cutoff_days: i64) -> Result<u64, sqlx::Error> {
let cutoff = cutoff_days.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log"
WHERE "occurred_at" < NOW() - ($1::int8 * INTERVAL '1 day')"#,
)
.bind(cutoff)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn cleanup_keep_last_n(pool: &PgPool, keep: i64) -> Result<u64, sqlx::Error> {
let keep = keep.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log" WHERE "id" IN (
SELECT "id" FROM (
SELECT "id",
ROW_NUMBER() OVER (
PARTITION BY "entity_table", "entity_pk"
ORDER BY "occurred_at" DESC, "id" DESC
) AS _rn
FROM "rustango_audit_log"
) ranked
WHERE _rn > $1
)"#,
)
.bind(keep)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn ensure_table(pool: &PgPool) -> Result<(), sqlx::Error> {
for stmt in CREATE_TABLE_SQL.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
sqlx::query(trimmed).execute(pool).await?;
}
Ok(())
}
pub const CREATE_TABLE_SQL_MYSQL: &str = r#"
CREATE TABLE IF NOT EXISTS `rustango_audit_log` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`entity_table` VARCHAR(255) NOT NULL,
`entity_pk` VARCHAR(255) NOT NULL,
`operation` VARCHAR(32) NOT NULL,
`source` VARCHAR(255) NOT NULL,
`changes` JSON NOT NULL,
`occurred_at` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
);
CREATE INDEX `rustango_audit_log_entity_idx`
ON `rustango_audit_log` (`entity_table`, `entity_pk`);
CREATE INDEX `rustango_audit_log_occurred_idx`
ON `rustango_audit_log` (`occurred_at` DESC);
"#;
pub const CREATE_TABLE_SQL_SQLITE: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_audit_log" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"entity_table" TEXT NOT NULL,
"entity_pk" TEXT NOT NULL,
"operation" TEXT NOT NULL,
"source" TEXT NOT NULL,
"changes" TEXT NOT NULL,
"occurred_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS "rustango_audit_log_entity_idx"
ON "rustango_audit_log" ("entity_table", "entity_pk");
CREATE INDEX IF NOT EXISTS "rustango_audit_log_occurred_idx"
ON "rustango_audit_log" ("occurred_at" DESC);
"#;
pub async fn ensure_table_pool(pool: &crate::sql::Pool) -> Result<(), sqlx::Error> {
let dialect = pool.dialect();
let ddl = match dialect.name() {
"postgres" => CREATE_TABLE_SQL,
"mysql" => CREATE_TABLE_SQL_MYSQL,
"sqlite" => CREATE_TABLE_SQL_SQLITE,
_ => CREATE_TABLE_SQL,
};
for stmt in ddl.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
sqlx::query(trimmed).execute(pg).await?;
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
if let Err(e) = sqlx::query(trimmed).execute(my).await {
if !is_mysql_dup_index_error(&e) {
return Err(e);
}
}
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
sqlx::query(trimmed).execute(sq).await?;
}
}
}
Ok(())
}
#[cfg(feature = "mysql")]
fn is_mysql_dup_index_error(e: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db) = e {
return db.code().as_deref() == Some("42000")
|| db.message().contains("Duplicate key name");
}
false
}
#[cfg(not(feature = "mysql"))]
#[allow(dead_code)]
fn is_mysql_dup_index_error(_e: &sqlx::Error) -> bool {
false
}
#[cfg(feature = "mysql")]
pub async fn emit_one_my<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::MySql>,
{
sqlx::query(
r#"INSERT INTO `rustango_audit_log`
(`entity_table`, `entity_pk`, `operation`, `source`, `changes`)
VALUES (?, ?, ?, ?, ?)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(sqlx::types::Json(&entry.changes))
.execute(executor)
.await?;
Ok(())
}
#[cfg(feature = "sqlite")]
pub async fn emit_one_sqlite<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Sqlite>,
{
sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES (?, ?, ?, ?, ?)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(sqlx::types::Json(&entry.changes))
.execute(executor)
.await?;
Ok(())
}
pub async fn emit_one_pool(
pool: &crate::sql::Pool,
entry: &PendingEntry,
) -> Result<(), sqlx::Error> {
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => emit_one(pg, entry).await,
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => emit_one_my(my, entry).await,
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => emit_one_sqlite(sq, entry).await,
}
}
pub async fn delete_one_with_audit_pool(
pool: &crate::sql::Pool,
query: &crate::core::DeleteQuery,
entry: &PendingEntry,
) -> Result<u64, crate::sql::ExecError> {
let stmt = pool.dialect().compile_delete(query)?;
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_pg(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_my(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one_my(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_sqlite(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one_sqlite(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
}
}
pub async fn save_one_with_audit_pool(
pool: &crate::sql::Pool,
query: &crate::core::UpdateQuery,
entry: &PendingEntry,
) -> Result<u64, crate::sql::ExecError> {
let stmt = pool.dialect().compile_update(query)?;
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_pg(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_my(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one_my(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_sqlite(q, v);
}
let affected = q.execute(&mut *tx).await?.rows_affected();
emit_one_sqlite(&mut *tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
}
}
pub async fn insert_one_with_audit_pool(
pool: &crate::sql::Pool,
query: &crate::core::InsertQuery,
entry: &PendingEntry,
) -> Result<crate::sql::InsertReturningPool, crate::sql::ExecError> {
query.validate()?;
if query.returning.is_empty() {
return Err(crate::sql::ExecError::EmptyReturning);
}
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let stmt = pool.dialect().compile_insert(query)?;
let mut tx = pg.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_pg(q, v);
}
use sqlx::Executor as _;
let row = (&mut *tx).fetch_one(q).await?;
emit_one(&mut *tx, entry).await?;
tx.commit().await?;
Ok(crate::sql::InsertReturningPool::PgRow(row))
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let plain = crate::core::InsertQuery {
model: query.model,
columns: query.columns.clone(),
values: query.values.clone(),
returning: ::std::vec::Vec::new(),
on_conflict: query.on_conflict.clone(),
};
let stmt = pool.dialect().compile_insert(&plain)?;
let mut tx = my.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_my(q, v);
}
q.execute(&mut *tx).await?;
use sqlx::Row as _;
let row = sqlx::query("SELECT LAST_INSERT_ID()")
.fetch_one(&mut *tx)
.await?;
let id_u64: u64 = row.try_get::<u64, _>(0)?;
let id = i64::try_from(id_u64).unwrap_or(i64::MAX);
emit_one_my(&mut *tx, entry).await?;
tx.commit().await?;
Ok(crate::sql::InsertReturningPool::MySqlAutoId(id))
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let stmt = pool.dialect().compile_insert(query)?;
let mut tx = sq.begin().await?;
let mut q: sqlx::query::Query<'_, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'_>> =
sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_sqlite(q, v);
}
use sqlx::Executor as _;
let row = (&mut *tx).fetch_one(q).await?;
emit_one_sqlite(&mut *tx, entry).await?;
tx.commit().await?;
Ok(crate::sql::InsertReturningPool::SqliteRow(row))
}
}
}
#[doc(hidden)]
#[cfg(feature = "postgres")]
pub fn __bind_value_pg(
q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> {
bind_value_pg(q, value)
}
#[doc(hidden)]
#[cfg(feature = "mysql")]
pub fn __bind_value_my(
q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> {
bind_value_my(q, value)
}
#[doc(hidden)]
#[cfg(feature = "sqlite")]
pub fn __bind_value_sqlite<'q>(
q: sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>> {
bind_value_sqlite(q, value)
}
#[cfg(feature = "postgres")]
fn bind_value_pg(
q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
}
}
#[cfg(feature = "mysql")]
fn bind_value_my(
q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
}
}
#[cfg(feature = "sqlite")]
fn bind_value_sqlite<'q>(
q: sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn save_one_with_diff_pool<F1, F2, F3>(
pool: &crate::sql::Pool,
update_query: &crate::core::UpdateQuery,
pk_column: &'static str,
pk_value: crate::core::SqlValue,
entity_table: &'static str,
entity_pk: String,
after_pairs: Vec<(&'static str, serde_json::Value)>,
select_cols_pg: &str,
select_cols_my: &str,
select_cols_sqlite: &str,
decode_before_pg: F1,
decode_before_my: F2,
decode_before_sqlite: F3,
) -> Result<(), crate::sql::ExecError>
where
F1: FnOnce(&crate::sql::PgReturningRow) -> Vec<(&'static str, serde_json::Value)>,
F2: FnOnce(&crate::sql::MyReturningRow) -> Vec<(&'static str, serde_json::Value)>,
F3: FnOnce(&crate::sql::SqliteReturningRow) -> Vec<(&'static str, serde_json::Value)>,
{
let _ = (&decode_before_pg, &decode_before_my, &decode_before_sqlite);
let _ = (select_cols_pg, select_cols_my, select_cols_sqlite);
let stmt = pool.dialect().compile_update(update_query)?;
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let select_sql = format!(
r#"SELECT {} FROM "{}" WHERE "{}" = $1"#,
select_cols_pg, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_pg(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_pg(&row)),
_ => None,
};
let mut q = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_pg(q, v);
}
q.execute(&mut *tx).await?;
if let Some(before) = before_pairs {
let entry = PendingEntry {
entity_table,
entity_pk,
operation: AuditOp::Update,
source: current_source(),
changes: diff_changes(&before, &after_pairs),
};
emit_one(&mut *tx, &entry).await?;
}
tx.commit().await?;
Ok(())
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let select_sql = format!(
"SELECT {} FROM `{}` WHERE `{}` = ?",
select_cols_my, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_my(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_my(&row)),
_ => None,
};
let mut q = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_my(q, v);
}
q.execute(&mut *tx).await?;
if let Some(before) = before_pairs {
let entry = PendingEntry {
entity_table,
entity_pk,
operation: AuditOp::Update,
source: current_source(),
changes: diff_changes(&before, &after_pairs),
};
emit_one_my(&mut *tx, &entry).await?;
}
tx.commit().await?;
Ok(())
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let select_sql = format!(
r#"SELECT {} FROM "{}" WHERE "{}" = ?"#,
select_cols_sqlite, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_sqlite(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_sqlite(&row)),
_ => None,
};
let mut q = sqlx::query(&stmt.sql);
for v in stmt.params {
q = bind_value_sqlite(q, v);
}
q.execute(&mut *tx).await?;
if let Some(before) = before_pairs {
let entry = PendingEntry {
entity_table,
entity_pk,
operation: AuditOp::Update,
source: current_source(),
changes: diff_changes(&before, &after_pairs),
};
emit_one_sqlite(&mut *tx, &entry).await?;
}
tx.commit().await?;
Ok(())
}
}
}