use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::CatalogEntry;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::audit::AuditEvent;
use crate::control::security::identity::{AuthenticatedIdentity, Permission, Role};
use crate::control::security::permission::{
format_permission, function_target, parse_permission, procedure_target, tenant_target,
};
use crate::control::state::SharedState;
use crate::types::TenantId;
use super::super::super::types::{require_tenant_admin, sqlstate_error};
fn canonicalize_grantee(state: &SharedState, raw: &str) -> PgWireResult<String> {
if state.credentials.get_user(raw).is_some() {
return Ok(format!("user:{raw}"));
}
let parsed: Role = match raw.parse() {
Ok(r) => r,
Err(e) => match e {},
};
let is_known_role = match &parsed {
Role::Custom(name) => state.roles.get_role(name).is_some(),
_ => true,
};
if is_known_role {
return Ok(parsed.to_string());
}
Err(sqlstate_error(
"42704",
&format!("grantee '{raw}' is neither a user nor a role"),
))
}
fn propose_grant(
state: &SharedState,
target: &str,
grantee: &str,
perm: Permission,
granted_by: &str,
) -> PgWireResult<()> {
let stored = state
.permissions
.prepare_permission(target, grantee, perm, granted_by);
let entry = CatalogEntry::PutPermission(Box::new(stored.clone()));
let log_index = propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
if log_index == 0 {
if let Some(catalog) = state.credentials.catalog() {
catalog
.put_permission(&stored)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?;
}
state.permissions.install_replicated_permission(&stored);
}
Ok(())
}
fn propose_revoke(
state: &SharedState,
target: &str,
grantee: &str,
perm: Permission,
) -> PgWireResult<()> {
let perm_str = format_permission(perm);
let entry = CatalogEntry::DeletePermission {
target: target.to_string(),
grantee: grantee.to_string(),
permission: perm_str.clone(),
};
let log_index = propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
if log_index == 0 {
if let Some(catalog) = state.credentials.catalog() {
catalog
.delete_permission(target, grantee, &perm_str)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?;
}
state
.permissions
.install_replicated_revoke(target, grantee, &perm_str);
}
Ok(())
}
fn resolve_tenant_id(state: &SharedState, name: &str) -> PgWireResult<TenantId> {
if let Ok(id) = name.parse::<u64>() {
return Ok(TenantId::new(id));
}
let catalog = state.credentials.catalog().as_ref().ok_or_else(|| {
sqlstate_error(
"XX000",
"tenant catalog unavailable — cannot resolve tenant name",
)
})?;
let tenants = catalog
.load_all_tenants()
.map_err(|e| sqlstate_error("XX000", &format!("tenant lookup: {e}")))?;
tenants
.into_iter()
.find(|t| t.name.eq_ignore_ascii_case(name))
.map(|t| TenantId::new(t.tenant_id))
.ok_or_else(|| sqlstate_error("42704", &format!("tenant '{name}' does not exist")))
}
fn resolve_target(
state: &SharedState,
identity: &AuthenticatedIdentity,
target_type: &str,
target_name: &str,
) -> PgWireResult<(String, String)> {
if target_type.eq_ignore_ascii_case("FUNCTION") {
let name = target_name.to_lowercase();
Ok((
function_target(identity.tenant_id, &name),
format!("function '{name}'"),
))
} else if target_type.eq_ignore_ascii_case("PROCEDURE") {
let name = target_name.to_lowercase();
Ok((
procedure_target(identity.tenant_id, &name),
format!("procedure '{name}'"),
))
} else if target_type.eq_ignore_ascii_case("TENANT") {
let tenant_id = resolve_tenant_id(state, target_name)?;
if tenant_id != identity.tenant_id && !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: managing permissions on another tenant requires superuser",
));
}
Ok((tenant_target(tenant_id), format!("tenant '{target_name}'")))
} else {
Ok((
format!("collection:{}:{target_name}", identity.tenant_id.as_u64()),
format!("collection '{target_name}'"),
))
}
}
fn resolve_permissions(permissions: &[String]) -> PgWireResult<Vec<Permission>> {
let mut out = Vec::new();
for p in permissions {
if p.eq_ignore_ascii_case("ALL") {
out.extend([
Permission::Read,
Permission::Write,
Permission::Create,
Permission::Drop,
Permission::Alter,
]);
} else {
let perm = parse_permission(p)
.ok_or_else(|| sqlstate_error("42601", &format!("unknown permission: {p}")))?;
out.push(perm);
}
}
Ok(out)
}
pub fn grant_permission(
state: &SharedState,
identity: &AuthenticatedIdentity,
permissions: &[String],
target_type: &str,
target_name: &str,
grantee: &str,
) -> PgWireResult<Vec<Response>> {
let (target, object_desc) = resolve_target(state, identity, target_type, target_name)?;
require_tenant_admin(identity, "grant permissions")?;
let perms = resolve_permissions(permissions)?;
let canonical = canonicalize_grantee(state, grantee)?;
for perm in &perms {
propose_grant(state, &target, &canonical, *perm, &identity.username)?;
}
state.audit_record(
AuditEvent::PrivilegeChange,
Some(identity.tenant_id),
&identity.username,
&format!(
"granted {} on {object_desc} to '{grantee}'",
permissions.join(", ")
),
);
Ok(vec![Response::Execution(Tag::new("GRANT"))])
}
pub fn revoke_permission(
state: &SharedState,
identity: &AuthenticatedIdentity,
permissions: &[String],
target_type: &str,
target_name: &str,
grantee: &str,
) -> PgWireResult<Vec<Response>> {
let (target, object_desc) = resolve_target(state, identity, target_type, target_name)?;
require_tenant_admin(identity, "revoke permissions")?;
let perms = resolve_permissions(permissions)?;
let canonical = canonicalize_grantee(state, grantee)?;
for perm in &perms {
propose_revoke(state, &target, &canonical, *perm)?;
}
state.audit_record(
AuditEvent::PrivilegeChange,
Some(identity.tenant_id),
&identity.username,
&format!(
"revoked {} on {object_desc} from '{grantee}'",
permissions.join(", ")
),
);
Ok(vec![Response::Execution(Tag::new("REVOKE"))])
}