use pgwire::error::PgWireResult;
use crate::control::security::identity::{AuthenticatedIdentity, Role};
use crate::control::server::pgwire::pg_catalog::vquery::VTable;
use crate::control::server::pgwire::pg_catalog::vquery::value::{VColumn, VType, VValue};
use crate::control::state::SharedState;
const MATERIALIZE_LIMIT: usize = 100_000;
pub fn audit_log(state: &SharedState, identity: &AuthenticatedIdentity) -> PgWireResult<VTable> {
if !identity.is_superuser && !identity.has_role(&Role::Monitor) {
return Err(pgwire::error::PgWireError::UserError(Box::new(
pgwire::error::ErrorInfo::new(
"ERROR".to_string(),
"42501".to_string(),
"permission denied: audit_log:read requires superuser or monitor role".to_string(),
),
)));
}
let mut table = VTable::new(vec![
VColumn::new("seq", VType::Int8),
VColumn::new("timestamp_us", VType::Int8),
VColumn::new("event", VType::Text),
VColumn::new("tenant_id", VType::Int8),
VColumn::new("source", VType::Text),
VColumn::new("detail", VType::Text),
VColumn::new("prev_hash", VType::Text),
]);
let mut seen: std::collections::HashSet<u64> = std::collections::HashSet::new();
if let Some(catalog) = state.credentials.catalog() {
let entries = catalog
.load_audit_entries_ranged(1, u64::MAX, 0, u64::MAX, MATERIALIZE_LIMIT)
.map_err(|e| pgwire::error::PgWireError::ApiError(Box::new(e)))?;
for e in entries {
if !seen.insert(e.seq) {
continue;
}
table.push(vec![
VValue::Int8(e.seq as i64),
VValue::Int8(e.timestamp_us as i64),
VValue::Text(e.event),
VValue::Int8(e.tenant_id.unwrap_or(0) as i64),
VValue::Text(e.source),
VValue::Text(e.detail),
if e.prev_hash.is_empty() {
VValue::Null
} else {
VValue::Text(e.prev_hash)
},
]);
}
}
let log = match state.audit.lock() {
Ok(l) => l,
Err(p) => p.into_inner(),
};
let all = log.all();
let skip = all.len().saturating_sub(MATERIALIZE_LIMIT);
for entry in all.iter().skip(skip) {
if !seen.insert(entry.seq) {
continue;
}
if table.rows.len() >= MATERIALIZE_LIMIT {
break;
}
table.push(vec![
VValue::Int8(entry.seq as i64),
VValue::Int8(entry.timestamp_us as i64),
VValue::Text(format!("{:?}", entry.event)),
VValue::Int8(entry.tenant_id.map_or(0i64, |t| t.as_u64() as i64)),
VValue::Text(entry.source.clone()),
VValue::Text(entry.detail.clone()),
if entry.prev_hash.is_empty() {
VValue::Null
} else {
VValue::Text(entry.prev_hash.clone())
},
]);
}
Ok(table)
}