use serde_json::{Map, Value};
use crate::sql::sqlx;
#[cfg(feature = "postgres")]
use crate::sql::sqlx::{postgres::PgRow, PgPool, Row};
#[derive(Debug, Clone)]
pub enum AuditSource {
System,
User { id: String },
Custom(String),
}
impl AuditSource {
#[must_use]
pub fn as_token(&self) -> String {
match self {
Self::System => "system".to_owned(),
Self::User { id } => format!("user:{id}"),
Self::Custom(s) => s.clone(),
}
}
}
impl Default for AuditSource {
fn default() -> Self {
Self::System
}
}
tokio::task_local! {
pub static AUDIT_SOURCE: AuditSource;
}
#[must_use]
pub fn current_source() -> AuditSource {
AUDIT_SOURCE
.try_with(Clone::clone)
.unwrap_or(AuditSource::System)
}
pub async fn with_source<F, T>(source: AuditSource, fut: F) -> T
where
F: std::future::Future<Output = T>,
{
AUDIT_SOURCE.scope(source, fut).await
}
#[derive(Debug, Clone)]
pub struct PendingEntry {
pub entity_table: &'static str,
pub entity_pk: String,
pub operation: AuditOp,
pub source: AuditSource,
pub changes: Value,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditOp {
Create,
Update,
Delete,
SoftDelete,
Restore,
Action,
}
impl AuditOp {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
Self::SoftDelete => "soft_delete",
Self::Restore => "restore",
Self::Action => "action",
}
}
}
#[cfg(feature = "postgres")]
pub async fn emit_one<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES ($1, $2, $3, $4, $5)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes)
.execute(executor)
.await?;
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn emit_many<'c, E>(executor: E, entries: &[PendingEntry]) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
if entries.is_empty() {
return Ok(());
}
let mut sql = String::from(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES "#,
);
let mut bind_idx = 1usize;
for (i, _) in entries.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
use std::fmt::Write as _;
let _ = write!(
sql,
"(${}, ${}, ${}, ${}, ${})",
bind_idx,
bind_idx + 1,
bind_idx + 2,
bind_idx + 3,
bind_idx + 4,
);
bind_idx += 5;
}
let mut q = sqlx::query(&sql);
for entry in entries {
q = q
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes);
}
q.execute(executor).await?;
Ok(())
}
#[must_use]
pub fn diff_changes(before: &[(&str, Value)], after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, after_val) in after {
let before_val = before
.iter()
.find(|(n, _)| n == name)
.map(|(_, v)| v.clone())
.unwrap_or(Value::Null);
if &before_val != after_val {
let mut entry = Map::new();
entry.insert("before".into(), before_val);
entry.insert("after".into(), after_val.clone());
out.insert((*name).into(), Value::Object(entry));
}
}
Value::Object(out)
}
#[must_use]
pub fn snapshot_changes(after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, val) in after {
out.insert((*name).to_string(), val.clone());
}
Value::Object(out)
}
fn audit_select_sql(dialect: &dyn crate::sql::Dialect) -> String {
use std::fmt::Write as _;
let t = dialect.quote_ident("rustango_audit_log");
let id = dialect.quote_ident("id");
let et = dialect.quote_ident("entity_table");
let ek = dialect.quote_ident("entity_pk");
let op = dialect.quote_ident("operation");
let src = dialect.quote_ident("source");
let ch = dialect.quote_ident("changes");
let oa = dialect.quote_ident("occurred_at");
let p1 = dialect.placeholder(1);
let p2 = dialect.placeholder(2);
let mut sql = String::new();
let _ = write!(
sql,
"SELECT {id}, {et}, {ek}, {op}, {src}, {ch}, {oa} \
FROM {t} \
WHERE {et} = {p1} AND {ek} = {p2} \
ORDER BY {oa} DESC, {id} DESC",
);
sql
}
fn audit_cleanup_older_than_sql(dialect: &dyn crate::sql::Dialect) -> String {
let t = dialect.quote_ident("rustango_audit_log");
let oa = dialect.quote_ident("occurred_at");
let p1 = dialect.placeholder(1);
format!("DELETE FROM {t} WHERE {oa} < {p1}")
}
fn audit_cleanup_keep_last_n_sql(dialect: &dyn crate::sql::Dialect) -> String {
let t = dialect.quote_ident("rustango_audit_log");
let id = dialect.quote_ident("id");
let et = dialect.quote_ident("entity_table");
let ek = dialect.quote_ident("entity_pk");
let oa = dialect.quote_ident("occurred_at");
let p1 = dialect.placeholder(1);
format!(
"DELETE FROM {t} WHERE {id} IN ( \
SELECT {id} FROM ( \
SELECT {id}, \
ROW_NUMBER() OVER ( \
PARTITION BY {et}, {ek} \
ORDER BY {oa} DESC, {id} DESC \
) AS _rn \
FROM {t} \
) ranked \
WHERE _rn > {p1} \
)"
)
}
#[cfg(feature = "postgres")]
pub async fn fetch_for_entity(
pool: &PgPool,
entity_table: &str,
entity_pk: &str,
) -> Result<Vec<AuditEntry>, sqlx::Error> {
fetch_for_entity_pool(
&crate::sql::Pool::from(pool.clone()),
entity_table,
entity_pk,
)
.await
}
#[derive(Debug, Clone)]
pub struct AuditEntry {
pub id: i64,
pub entity_table: String,
pub entity_pk: String,
pub operation: String,
pub source: String,
pub changes: Value,
pub occurred_at: chrono::DateTime<chrono::Utc>,
}
#[cfg(feature = "postgres")]
impl AuditEntry {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
Ok(Self {
id: row.try_get("id")?,
entity_table: row.try_get("entity_table")?,
entity_pk: row.try_get("entity_pk")?,
operation: row.try_get("operation")?,
source: row.try_get("source")?,
changes: row.try_get("changes")?,
occurred_at: row.try_get("occurred_at")?,
})
}
}
#[cfg(feature = "mysql")]
impl AuditEntry {
fn from_my_row(row: &sqlx::mysql::MySqlRow) -> Result<Self, sqlx::Error> {
use sqlx::Row as _;
let changes: sqlx::types::Json<Value> = row.try_get("changes")?;
Ok(Self {
id: row.try_get("id")?,
entity_table: row.try_get("entity_table")?,
entity_pk: row.try_get("entity_pk")?,
operation: row.try_get("operation")?,
source: row.try_get("source")?,
changes: changes.0,
occurred_at: row.try_get("occurred_at")?,
})
}
}
#[cfg(feature = "sqlite")]
impl AuditEntry {
fn from_sq_row(row: &sqlx::sqlite::SqliteRow) -> Result<Self, sqlx::Error> {
use sqlx::Row as _;
let changes_text: String = row.try_get("changes")?;
let changes: Value = serde_json::from_str(&changes_text).map_err(|e| {
sqlx::Error::Decode(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("audit `changes` is not valid JSON: {e}"),
)))
})?;
Ok(Self {
id: row.try_get("id")?,
entity_table: row.try_get("entity_table")?,
entity_pk: row.try_get("entity_pk")?,
operation: row.try_get("operation")?,
source: row.try_get("source")?,
changes,
occurred_at: row.try_get("occurred_at")?,
})
}
}
pub const CREATE_TABLE_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_audit_log" (
"id" BIGSERIAL PRIMARY KEY,
"entity_table" TEXT NOT NULL,
"entity_pk" TEXT NOT NULL,
"operation" TEXT NOT NULL,
"source" TEXT NOT NULL,
"changes" JSONB NOT NULL,
"occurred_at" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "rustango_audit_log_entity_idx"
ON "rustango_audit_log" ("entity_table", "entity_pk");
CREATE INDEX IF NOT EXISTS "rustango_audit_log_occurred_idx"
ON "rustango_audit_log" ("occurred_at" DESC);
"#;
#[cfg(feature = "postgres")]
pub async fn cleanup_older_than(pool: &PgPool, cutoff_days: i64) -> Result<u64, sqlx::Error> {
let cutoff = cutoff_days.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log"
WHERE "occurred_at" < NOW() - ($1::int8 * INTERVAL '1 day')"#,
)
.bind(cutoff)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
#[cfg(feature = "postgres")]
pub async fn cleanup_keep_last_n(pool: &PgPool, keep: i64) -> Result<u64, sqlx::Error> {
let keep = keep.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log" WHERE "id" IN (
SELECT "id" FROM (
SELECT "id",
ROW_NUMBER() OVER (
PARTITION BY "entity_table", "entity_pk"
ORDER BY "occurred_at" DESC, "id" DESC
) AS _rn
FROM "rustango_audit_log"
) ranked
WHERE _rn > $1
)"#,
)
.bind(keep)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
#[cfg(feature = "postgres")]
pub async fn ensure_table(pool: &PgPool) -> Result<(), sqlx::Error> {
for stmt in CREATE_TABLE_SQL.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
sqlx::query(trimmed).execute(pool).await?;
}
Ok(())
}
pub const CREATE_TABLE_SQL_MYSQL: &str = r#"
CREATE TABLE IF NOT EXISTS `rustango_audit_log` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`entity_table` VARCHAR(255) NOT NULL,
`entity_pk` VARCHAR(255) NOT NULL,
`operation` VARCHAR(32) NOT NULL,
`source` VARCHAR(255) NOT NULL,
`changes` JSON NOT NULL,
`occurred_at` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
);
CREATE INDEX `rustango_audit_log_entity_idx`
ON `rustango_audit_log` (`entity_table`, `entity_pk`);
CREATE INDEX `rustango_audit_log_occurred_idx`
ON `rustango_audit_log` (`occurred_at` DESC);
"#;
pub const CREATE_TABLE_SQL_SQLITE: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_audit_log" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"entity_table" TEXT NOT NULL,
"entity_pk" TEXT NOT NULL,
"operation" TEXT NOT NULL,
"source" TEXT NOT NULL,
"changes" TEXT NOT NULL,
"occurred_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS "rustango_audit_log_entity_idx"
ON "rustango_audit_log" ("entity_table", "entity_pk");
CREATE INDEX IF NOT EXISTS "rustango_audit_log_occurred_idx"
ON "rustango_audit_log" ("occurred_at" DESC);
"#;
pub async fn ensure_table_pool(pool: &crate::sql::Pool) -> Result<(), sqlx::Error> {
let ddl = match pool.dialect().name() {
"postgres" => CREATE_TABLE_SQL,
"mysql" => CREATE_TABLE_SQL_MYSQL,
"sqlite" => CREATE_TABLE_SQL_SQLITE,
_ => CREATE_TABLE_SQL,
};
crate::sql::run_ddl_idempotent(pool, ddl).await
}
#[cfg(feature = "mysql")]
pub async fn emit_one_my<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::MySql>,
{
sqlx::query(
r#"INSERT INTO `rustango_audit_log`
(`entity_table`, `entity_pk`, `operation`, `source`, `changes`)
VALUES (?, ?, ?, ?, ?)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(sqlx::types::Json(&entry.changes))
.execute(executor)
.await?;
Ok(())
}
#[cfg(feature = "sqlite")]
pub async fn emit_one_sqlite<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Sqlite>,
{
sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES (?, ?, ?, ?, ?)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(sqlx::types::Json(&entry.changes))
.execute(executor)
.await?;
Ok(())
}
pub async fn emit_one_pool(
pool: &crate::sql::Pool,
entry: &PendingEntry,
) -> Result<(), sqlx::Error> {
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => emit_one(pg, entry).await,
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => emit_one_my(my, entry).await,
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => emit_one_sqlite(sq, entry).await,
}
}
#[derive(Debug, Clone, Default)]
pub struct AuditFilter {
pub entity_table: Option<String>,
pub entity_pk: Option<String>,
pub operation: Option<String>,
pub source: Option<String>,
}
impl AuditFilter {
fn active_pairs(&self) -> Vec<(&'static str, &str)> {
let mut out = Vec::with_capacity(4);
if let Some(v) = self.entity_table.as_deref() {
if !v.is_empty() {
out.push(("entity_table", v));
}
}
if let Some(v) = self.entity_pk.as_deref() {
if !v.is_empty() {
out.push(("entity_pk", v));
}
}
if let Some(v) = self.operation.as_deref() {
if !v.is_empty() {
out.push(("operation", v));
}
}
if let Some(v) = self.source.as_deref() {
if !v.is_empty() {
out.push(("source", v));
}
}
out
}
}
pub async fn list(
pool: &crate::sql::Pool,
filter: &AuditFilter,
page_size: i64,
offset: i64,
) -> Result<Vec<AuditEntry>, sqlx::Error> {
let pairs = filter.active_pairs();
let sql = audit_list_sql(pool.dialect(), &pairs);
let binds: Vec<&str> = pairs.iter().map(|(_, v)| *v).collect();
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut q = sqlx::query(&sql);
for v in &binds {
q = q.bind(*v);
}
let rows = q.bind(page_size).bind(offset).fetch_all(pg).await?;
rows.iter().map(AuditEntry::from_row).collect()
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut q = sqlx::query(&sql);
for v in &binds {
q = q.bind(*v);
}
let rows = q.bind(page_size).bind(offset).fetch_all(my).await?;
rows.iter().map(AuditEntry::from_my_row).collect()
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut q = sqlx::query(&sql);
for v in &binds {
q = q.bind(*v);
}
let rows = q.bind(page_size).bind(offset).fetch_all(sq).await?;
rows.iter().map(AuditEntry::from_sq_row).collect()
}
}
}
pub async fn count(pool: &crate::sql::Pool, filter: &AuditFilter) -> Result<i64, sqlx::Error> {
use crate::core::SqlValue;
let pairs = filter.active_pairs();
let sql = audit_count_sql(pool.dialect(), &pairs);
let binds: Vec<SqlValue> = pairs
.iter()
.map(|(_, v)| SqlValue::String((*v).to_owned()))
.collect();
let rows: Vec<(i64,)> = crate::sql::raw_query_pool(&sql, binds, pool)
.await
.map_err(|e| match e {
crate::sql::ExecError::Driver(err) => err,
other => sqlx::Error::Protocol(format!("{other}")),
})?;
Ok(rows.into_iter().next().map_or(0, |t| t.0))
}
pub async fn facet_counts(
pool: &crate::sql::Pool,
column: &str,
) -> Result<Vec<(String, i64)>, sqlx::Error> {
if !matches!(column, "entity_table" | "operation" | "source") {
return Err(sqlx::Error::ColumnNotFound(column.to_owned()));
}
let sql = audit_facet_sql(pool.dialect(), column);
crate::sql::raw_query_pool::<(String, i64)>(&sql, Vec::new(), pool)
.await
.map_err(|e| match e {
crate::sql::ExecError::Driver(err) => err,
other => sqlx::Error::Protocol(format!("{other}")),
})
}
fn audit_list_sql(dialect: &dyn crate::sql::Dialect, pairs: &[(&'static str, &str)]) -> String {
use std::fmt::Write as _;
let t = dialect.quote_ident("rustango_audit_log");
let id = dialect.quote_ident("id");
let et = dialect.quote_ident("entity_table");
let ek = dialect.quote_ident("entity_pk");
let op = dialect.quote_ident("operation");
let src = dialect.quote_ident("source");
let ch = dialect.quote_ident("changes");
let oa = dialect.quote_ident("occurred_at");
let mut sql = String::new();
let _ = write!(
sql,
"SELECT {id}, {et}, {ek}, {op}, {src}, {ch}, {oa} FROM {t}",
);
let mut bind_idx = 1usize;
for (i, (col, _)) in pairs.iter().enumerate() {
let prefix = if i == 0 { " WHERE " } else { " AND " };
let col_q = dialect.quote_ident(col);
let ph = dialect.placeholder(bind_idx);
let _ = write!(sql, "{prefix}{col_q} = {ph}");
bind_idx += 1;
}
let p_limit = dialect.placeholder(bind_idx);
let p_offset = dialect.placeholder(bind_idx + 1);
let _ = write!(
sql,
" ORDER BY {oa} DESC, {id} DESC LIMIT {p_limit} OFFSET {p_offset}"
);
sql
}
fn audit_count_sql(dialect: &dyn crate::sql::Dialect, pairs: &[(&'static str, &str)]) -> String {
use std::fmt::Write as _;
let t = dialect.quote_ident("rustango_audit_log");
let mut sql = format!("SELECT COUNT(*) FROM {t}");
for (i, (col, _)) in pairs.iter().enumerate() {
let prefix = if i == 0 { " WHERE " } else { " AND " };
let col_q = dialect.quote_ident(col);
let ph = dialect.placeholder(i + 1);
let _ = write!(sql, "{prefix}{col_q} = {ph}");
}
sql
}
fn audit_facet_sql(dialect: &dyn crate::sql::Dialect, column: &str) -> String {
let t = dialect.quote_ident("rustango_audit_log");
let col = dialect.quote_ident(column);
format!(
"SELECT {col} AS facet_value, COUNT(*) AS facet_count \
FROM {t} GROUP BY {col} ORDER BY facet_count DESC, {col}"
)
}
pub async fn emit_many_pool(
pool: &crate::sql::Pool,
entries: &[PendingEntry],
) -> Result<(), sqlx::Error> {
if entries.is_empty() {
return Ok(());
}
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => emit_many(pg, entries).await,
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
for entry in entries {
emit_one_my(&mut *tx, entry).await?;
}
tx.commit().await
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
for entry in entries {
emit_one_sqlite(&mut *tx, entry).await?;
}
tx.commit().await
}
}
}
pub async fn fetch_for_entity_pool(
pool: &crate::sql::Pool,
entity_table: &str,
entity_pk: &str,
) -> Result<Vec<AuditEntry>, sqlx::Error> {
let sql = audit_select_sql(pool.dialect());
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let rows = sqlx::query(&sql)
.bind(entity_table)
.bind(entity_pk)
.fetch_all(pg)
.await?;
rows.iter().map(AuditEntry::from_row).collect()
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let rows = sqlx::query(&sql)
.bind(entity_table)
.bind(entity_pk)
.fetch_all(my)
.await?;
rows.iter().map(AuditEntry::from_my_row).collect()
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let rows = sqlx::query(&sql)
.bind(entity_table)
.bind(entity_pk)
.fetch_all(sq)
.await?;
rows.iter().map(AuditEntry::from_sq_row).collect()
}
}
}
pub async fn cleanup_older_than_pool(
pool: &crate::sql::Pool,
cutoff_days: i64,
) -> Result<u64, sqlx::Error> {
use crate::core::SqlValue;
let cutoff = cutoff_days.max(0);
let cutoff_ts = chrono::Utc::now() - chrono::Duration::days(cutoff);
let sql = audit_cleanup_older_than_sql(pool.dialect());
let bind = if pool.dialect().name() == "sqlite" {
SqlValue::String(cutoff_ts.format("%Y-%m-%d %H:%M:%S").to_string())
} else {
SqlValue::DateTime(cutoff_ts)
};
crate::sql::raw_execute_pool(pool, &sql, vec![bind])
.await
.map_err(|e| match e {
crate::sql::ExecError::Driver(err) => err,
other => sqlx::Error::Protocol(format!("{other}")),
})
}
pub async fn cleanup_keep_last_n_pool(
pool: &crate::sql::Pool,
keep: i64,
) -> Result<u64, sqlx::Error> {
use crate::core::SqlValue;
let keep = keep.max(0);
let sql = audit_cleanup_keep_last_n_sql(pool.dialect());
crate::sql::raw_execute_pool(pool, &sql, vec![SqlValue::I64(keep)])
.await
.map_err(|e| match e {
crate::sql::ExecError::Driver(err) => err,
other => sqlx::Error::Protocol(format!("{other}")),
})
}
pub async fn delete_one_with_audit(
pool: &crate::sql::Pool,
query: &crate::core::DeleteQuery,
entry: &PendingEntry,
) -> Result<u64, crate::sql::ExecError> {
let stmt = pool.dialect().compile_delete(query)?;
let mut tx = crate::sql::transaction_pool(pool).await?;
let affected = crate::sql::raw_execute_tx(&mut tx, &stmt.sql, stmt.params).await?;
emit_one_tx(&mut tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
async fn emit_one_tx(
tx: &mut crate::sql::PoolTx<'_>,
entry: &PendingEntry,
) -> Result<(), sqlx::Error> {
match tx {
#[cfg(feature = "postgres")]
crate::sql::PoolTx::Postgres(t) => emit_one(&mut **t, entry).await,
#[cfg(feature = "mysql")]
crate::sql::PoolTx::Mysql(t) => emit_one_my(&mut **t, entry).await,
#[cfg(feature = "sqlite")]
crate::sql::PoolTx::Sqlite(t) => emit_one_sqlite(&mut **t, entry).await,
}
}
pub async fn save_one_with_audit(
pool: &crate::sql::Pool,
query: &crate::core::UpdateQuery,
entry: &PendingEntry,
) -> Result<u64, crate::sql::ExecError> {
let stmt = pool.dialect().compile_update(query)?;
let mut tx = crate::sql::transaction_pool(pool).await?;
let affected = crate::sql::raw_execute_tx(&mut tx, &stmt.sql, stmt.params).await?;
emit_one_tx(&mut tx, entry).await?;
tx.commit().await?;
Ok(affected)
}
pub async fn insert_one_with_audit(
pool: &crate::sql::Pool,
query: &crate::core::InsertQuery,
entry: &PendingEntry,
) -> Result<crate::sql::InsertReturningPool, crate::sql::ExecError> {
let mut tx = crate::sql::transaction_pool(pool).await?;
let returning = crate::sql::insert_returning_tx(&mut tx, query).await?;
emit_one_tx(&mut tx, entry).await?;
tx.commit().await?;
Ok(returning)
}
#[doc(hidden)]
#[cfg(feature = "postgres")]
pub fn __bind_value_pg(
q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> {
bind_value_pg(q, value)
}
#[doc(hidden)]
#[cfg(feature = "mysql")]
pub fn __bind_value_my(
q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> {
bind_value_my(q, value)
}
#[doc(hidden)]
#[cfg(feature = "sqlite")]
pub fn __bind_value_sqlite<'q>(
q: sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>> {
bind_value_sqlite(q, value)
}
#[cfg(feature = "postgres")]
fn bind_value_pg(
q: sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::Postgres, sqlx::postgres::PgArguments> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Time(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::Decimal(v) => q.bind(v),
SqlValue::Binary(v) => q.bind(v),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
SqlValue::Array(_) => unreachable!("Array values never reach audited-save bind path"),
SqlValue::RangeLiteral(_) => {
unreachable!("RangeLiteral values never reach audited-save bind path")
}
SqlValue::HStore(_) => {
unreachable!("HStore values never reach audited-save bind path")
}
SqlValue::Vector(_) => {
unreachable!("Vector values never reach audited-save bind path")
}
SqlValue::Geometry { .. } => {
unreachable!("Geometry values never reach audited-save bind path")
}
}
}
#[cfg(feature = "mysql")]
fn bind_value_my(
q: sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'_, sqlx::MySql, sqlx::mysql::MySqlArguments> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Time(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::Decimal(v) => q.bind(v),
SqlValue::Binary(v) => q.bind(v),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
SqlValue::Array(_) => unreachable!("Array values never reach audited-save bind path"),
SqlValue::RangeLiteral(_) => {
unreachable!("RangeLiteral values never reach audited-save bind path")
}
SqlValue::HStore(_) => {
unreachable!("HStore values never reach audited-save bind path")
}
SqlValue::Vector(_) => {
unreachable!("Vector values never reach audited-save bind path")
}
SqlValue::Geometry { .. } => {
unreachable!("Geometry values never reach audited-save bind path")
}
}
}
#[cfg(feature = "sqlite")]
fn bind_value_sqlite<'q>(
q: sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>>,
value: crate::core::SqlValue,
) -> sqlx::query::Query<'q, sqlx::Sqlite, sqlx::sqlite::SqliteArguments<'q>> {
use crate::core::SqlValue;
match value {
SqlValue::Null => q.bind(None::<String>),
SqlValue::I16(v) => q.bind(v),
SqlValue::I32(v) => q.bind(v),
SqlValue::I64(v) => q.bind(v),
SqlValue::F32(v) => q.bind(v),
SqlValue::F64(v) => q.bind(v),
SqlValue::Bool(v) => q.bind(v),
SqlValue::String(v) => q.bind(v),
SqlValue::DateTime(v) => q.bind(v),
SqlValue::Date(v) => q.bind(v),
SqlValue::Time(v) => q.bind(v),
SqlValue::Uuid(v) => q.bind(v),
SqlValue::Json(v) => q.bind(sqlx::types::Json(v)),
SqlValue::Decimal(v) => q.bind(v.to_string()),
SqlValue::Binary(v) => q.bind(v),
SqlValue::List(_) => unreachable!("List expanded to scalars by SQL writer"),
SqlValue::Array(_) => unreachable!("Array values never reach audited-save bind path"),
SqlValue::RangeLiteral(_) => {
unreachable!("RangeLiteral values never reach audited-save bind path")
}
SqlValue::HStore(_) => {
unreachable!("HStore values never reach audited-save bind path")
}
SqlValue::Vector(_) => {
unreachable!("Vector values never reach audited-save bind path")
}
SqlValue::Geometry { .. } => {
unreachable!("Geometry values never reach audited-save bind path")
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn save_one_with_diff<F1, F2, F3>(
pool: &crate::sql::Pool,
update_query: &crate::core::UpdateQuery,
pk_column: &'static str,
pk_value: crate::core::SqlValue,
entity_table: &'static str,
entity_pk: String,
after_pairs: Vec<(&'static str, serde_json::Value)>,
select_cols_pg: &str,
select_cols_my: &str,
select_cols_sqlite: &str,
decode_before_pg: F1,
decode_before_my: F2,
decode_before_sqlite: F3,
) -> Result<(), crate::sql::ExecError>
where
F1: FnOnce(&crate::sql::PgReturningRow) -> Vec<(&'static str, serde_json::Value)>,
F2: FnOnce(&crate::sql::MyReturningRow) -> Vec<(&'static str, serde_json::Value)>,
F3: FnOnce(&crate::sql::SqliteReturningRow) -> Vec<(&'static str, serde_json::Value)>,
{
let _ = (&decode_before_pg, &decode_before_my, &decode_before_sqlite);
let _ = (select_cols_pg, select_cols_my, select_cols_sqlite);
let stmt = pool.dialect().compile_update(update_query)?;
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
let mut tx = pg.begin().await?;
let select_sql = format!(
r#"SELECT {} FROM "{}" WHERE "{}" = $1"#,
select_cols_pg, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_pg(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_pg(&row)),
_ => None,
};
let mut wrapped = crate::sql::PoolTx::Postgres(tx);
finish_update_with_audit_diff(
&mut wrapped,
&stmt,
before_pairs,
&after_pairs,
entity_table,
&entity_pk,
)
.await?;
wrapped.commit().await?;
Ok(())
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
let mut tx = my.begin().await?;
let select_sql = format!(
"SELECT {} FROM `{}` WHERE `{}` = ?",
select_cols_my, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_my(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_my(&row)),
_ => None,
};
let mut wrapped = crate::sql::PoolTx::Mysql(tx);
finish_update_with_audit_diff(
&mut wrapped,
&stmt,
before_pairs,
&after_pairs,
entity_table,
&entity_pk,
)
.await?;
wrapped.commit().await?;
Ok(())
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
let mut tx = sq.begin().await?;
let select_sql = format!(
r#"SELECT {} FROM "{}" WHERE "{}" = ?"#,
select_cols_sqlite, entity_table, pk_column,
);
let pk_q = sqlx::query(&select_sql);
let pk_q = bind_value_sqlite(pk_q, pk_value);
let before_pairs: Option<Vec<(&'static str, serde_json::Value)>> =
match pk_q.fetch_optional(&mut *tx).await {
Ok(Some(row)) => Some(decode_before_sqlite(&row)),
_ => None,
};
let mut wrapped = crate::sql::PoolTx::Sqlite(tx);
finish_update_with_audit_diff(
&mut wrapped,
&stmt,
before_pairs,
&after_pairs,
entity_table,
&entity_pk,
)
.await?;
wrapped.commit().await?;
Ok(())
}
}
}
async fn finish_update_with_audit_diff(
tx: &mut crate::sql::PoolTx<'_>,
stmt: &crate::sql::CompiledStatement,
before_pairs: Option<Vec<(&'static str, serde_json::Value)>>,
after_pairs: &[(&'static str, serde_json::Value)],
entity_table: &'static str,
entity_pk: &str,
) -> Result<(), crate::sql::ExecError> {
crate::sql::raw_execute_tx(tx, &stmt.sql, stmt.params.clone()).await?;
if let Some(before) = before_pairs {
let entry = PendingEntry {
entity_table,
entity_pk: entity_pk.to_owned(),
operation: AuditOp::Update,
source: current_source(),
changes: diff_changes(&before, after_pairs),
};
emit_one_tx(tx, &entry).await?;
}
Ok(())
}