use std::str::FromStr;
use chrono::Utc;
use sqlx::Row;
use uuid::Uuid;
use crate::audit_log_store::AuditLogStore;
use crate::entities::{AuditLogEntry, AuditLogFilter, EventKind, NewAuditLogEntry, Page};
use crate::error::StoreError;
use crate::store::StoreFuture;
use super::PostgresStore;
fn row_to_entry(row: sqlx::postgres::PgRow) -> Result<AuditLogEntry, StoreError> {
let event_type_str: String = row.get("event_type");
let event_type =
EventKind::from_str(&event_type_str).map_err(|e| StoreError::Database(e.to_string()))?;
Ok(AuditLogEntry {
id: row.get("id"),
event_type,
payload: row.get("payload"),
run_id: row.get("run_id"),
step_id: row.get("step_id"),
user_id: row.get("user_id"),
created_at: row.get("created_at"),
})
}
impl AuditLogStore for PostgresStore {
fn append_audit_log(&self, entry: NewAuditLogEntry) -> StoreFuture<'_, AuditLogEntry> {
Box::pin(async move {
let id = Uuid::now_v7();
let now = Utc::now();
let row = sqlx::query(
r#"
INSERT INTO ironflow.audit_logs (id, event_type, payload, run_id, step_id, user_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, event_type, payload, run_id, step_id, user_id, created_at
"#,
)
.bind(id)
.bind(entry.event_type.as_str())
.bind(&entry.payload)
.bind(entry.run_id)
.bind(entry.step_id)
.bind(entry.user_id)
.bind(now)
.fetch_one(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;
row_to_entry(row)
})
}
fn list_audit_logs(
&self,
filter: AuditLogFilter,
page: u32,
per_page: u32,
) -> StoreFuture<'_, Page<AuditLogEntry>> {
Box::pin(async move {
let offset = (page.saturating_sub(1) as i64) * (per_page as i64);
let mut conditions = Vec::new();
let mut bind_idx = 1u32;
if filter.event_type.is_some() {
conditions.push(format!("event_type = ${bind_idx}"));
bind_idx += 1;
}
if filter.run_id.is_some() {
conditions.push(format!("run_id = ${bind_idx}"));
bind_idx += 1;
}
if filter.from.is_some() {
conditions.push(format!("created_at >= ${bind_idx}"));
bind_idx += 1;
}
if filter.to.is_some() {
conditions.push(format!("created_at <= ${bind_idx}"));
bind_idx += 1;
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let sql = format!(
r#"
SELECT id, event_type, payload, run_id, step_id, user_id, created_at,
COUNT(*) OVER() as total_count
FROM ironflow.audit_logs
{where_clause}
ORDER BY created_at DESC
LIMIT ${bind_idx} OFFSET ${}
"#,
bind_idx + 1
);
let mut query = sqlx::query(&sql);
if let Some(ref event_type) = filter.event_type {
query = query.bind(event_type.as_str());
}
if let Some(run_id) = filter.run_id {
query = query.bind(run_id);
}
if let Some(from) = filter.from {
query = query.bind(from);
}
if let Some(to) = filter.to {
query = query.bind(to);
}
query = query.bind(per_page as i64).bind(offset);
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| StoreError::Database(e.to_string()))?;
let total = if rows.is_empty() {
0u64
} else {
rows[0].get::<i64, _>("total_count") as u64
};
let items = rows
.into_iter()
.map(row_to_entry)
.collect::<Result<Vec<_>, _>>()?;
Ok(Page {
items,
total,
page,
per_page,
})
})
}
}