use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::types::{int8_field, sqlstate_error, text_field};
pub use super::inspect_audit::{
export_audit_log, show_audit_in_database, show_audit_log, show_audit_where,
};
pub fn show_users(
state: &SharedState,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
text_field("username"),
int8_field("tenant_id"),
text_field("roles"),
text_field("is_superuser"),
]);
let users = state.credentials.list_user_details();
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
for user in &users {
if !identity.is_superuser && user.tenant_id != identity.tenant_id {
continue;
}
encoder.encode_field(&user.username)?;
encoder.encode_field(&(user.tenant_id.as_u64() as i64))?;
let roles_str: String = user
.roles
.iter()
.map(|r| r.to_string())
.collect::<Vec<_>>()
.join(", ");
encoder.encode_field(&roles_str)?;
encoder.encode_field(&if user.is_superuser { "t" } else { "f" })?;
rows.push(Ok(encoder.take_row()));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_tenants(
state: &SharedState,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can list tenants",
));
}
let (schema, rows) = tenant_rows(state, |_, _| true)?;
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_tenant_by_identifier(
state: &SharedState,
identity: &AuthenticatedIdentity,
ident: &str,
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can introspect tenants",
));
}
let (schema, rows) = tenant_rows(state, |t_id, t_name| {
if let Ok(n) = ident.parse::<u64>() {
t_id == n
} else {
t_name.eq_ignore_ascii_case(ident)
}
})?;
if rows.is_empty() {
return Err(sqlstate_error(
"42704",
&format!("tenant '{ident}' not found"),
));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_tenants_filtered_by_name(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can list tenants",
));
}
let (schema, rows) = tenant_rows(state, |_t_id, t_name| t_name.eq_ignore_ascii_case(name))?;
if rows.is_empty() {
return Err(sqlstate_error(
"42704",
&format!("tenant '{name}' not found"),
));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
type TenantRowSet = (
Arc<Vec<pgwire::api::results::FieldInfo>>,
Vec<PgWireResult<pgwire::messages::data::DataRow>>,
);
fn tenant_rows<F>(state: &SharedState, pred: F) -> PgWireResult<TenantRowSet>
where
F: Fn(u64, &str) -> bool,
{
let schema = Arc::new(vec![
int8_field("tenant_id"),
text_field("name"),
int8_field("active_requests"),
int8_field("total_requests"),
int8_field("rejected_requests"),
]);
let tenants = match state.tenants.lock() {
Ok(t) => t,
Err(p) => p.into_inner(),
};
let mut names: std::collections::HashMap<u64, String> = std::collections::HashMap::new();
if let Some(catalog) = state.credentials.catalog()
&& let Ok(all) = catalog.load_all_tenants()
{
for t in all {
names.insert(t.tenant_id, t.name);
}
}
let mut seen: std::collections::BTreeSet<u64> = names.keys().copied().collect();
for user in &state.credentials.list_user_details() {
seen.insert(user.tenant_id.as_u64());
}
let mut rows = Vec::new();
for tid_u64 in seen {
let tid_name = names.get(&tid_u64).map(String::as_str).unwrap_or("");
if !pred(tid_u64, tid_name) {
continue;
}
let tid = crate::types::TenantId::new(tid_u64);
let usage = tenants.usage(tid);
let mut encoder = DataRowEncoder::new(schema.clone());
encoder.encode_field(&(tid_u64 as i64))?;
encoder.encode_field(&tid_name)?;
encoder.encode_field(&(usage.map_or(0, |u| u.active_requests as i64)))?;
encoder.encode_field(&(usage.map_or(0, |u| u.total_requests as i64)))?;
encoder.encode_field(&(usage.map_or(0, |u| u.rejected_requests as i64)))?;
rows.push(Ok(encoder.take_row()));
}
Ok((schema, rows))
}
pub fn show_roles(
state: &SharedState,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
text_field("name"),
int8_field("tenant_id"),
text_field("parent"),
int8_field("created_at"),
]);
let roles = state.roles.list_roles();
let mut rows = Vec::new();
for role in &roles {
if !identity.is_superuser && role.tenant_id != identity.tenant_id {
continue;
}
let mut encoder = DataRowEncoder::new(schema.clone());
encoder.encode_field(&role.name)?;
encoder.encode_field(&(role.tenant_id.as_u64() as i64))?;
encoder.encode_field(&role.parent.as_deref().unwrap_or(""))?;
encoder.encode_field(&(role.created_at as i64))?;
rows.push(Ok(encoder.take_row()));
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_session(identity: &AuthenticatedIdentity) -> PgWireResult<Vec<Response>> {
let schema = Arc::new(vec![
text_field("username"),
int8_field("user_id"),
int8_field("tenant_id"),
text_field("roles"),
text_field("auth_method"),
text_field("is_superuser"),
]);
let roles_str: String = identity
.roles
.iter()
.map(|r| r.to_string())
.collect::<Vec<_>>()
.join(", ");
let auth_method = format!("{:?}", identity.auth_method);
let mut encoder = DataRowEncoder::new(schema.clone());
encoder.encode_field(&identity.username)?;
encoder.encode_field(&(identity.user_id as i64))?;
encoder.encode_field(&(identity.tenant_id.as_u64() as i64))?;
encoder.encode_field(&roles_str)?;
encoder.encode_field(&auth_method)?;
encoder.encode_field(&if identity.is_superuser { "t" } else { "f" })?;
let row = encoder.take_row();
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(vec![Ok(row)]),
))])
}
pub fn show_grants(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
let target_user = if parts.len() >= 4
&& parts[1].eq_ignore_ascii_case("GRANTS")
&& parts[2].eq_ignore_ascii_case("FOR")
{
let target = parts[3];
if target != identity.username
&& !identity.is_superuser
&& !identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
return Err(sqlstate_error(
"42501",
"permission denied: can only view your own grants, or be superuser/tenant_admin",
));
}
target.to_string()
} else {
identity.username.clone()
};
let schema = Arc::new(vec![text_field("username"), text_field("role")]);
let user = state.credentials.get_user(&target_user);
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
if let Some(user) = user {
for role in &user.roles {
encoder.encode_field(&user.username)?;
encoder.encode_field(&role.to_string())?;
rows.push(Ok(encoder.take_row()));
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_permissions(
state: &SharedState,
identity: &AuthenticatedIdentity,
on_collection: Option<&str>,
for_grantee: Option<&str>,
) -> PgWireResult<Vec<Response>> {
if let Some(grantee) = for_grantee
&& grantee != identity.username
&& !identity.is_superuser
&& !identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
return Err(sqlstate_error(
"42501",
"permission denied: can only view your own permissions, or be superuser/tenant_admin",
));
}
let schema = Arc::new(vec![
text_field("grantee"),
text_field("permission"),
text_field("target"),
text_field("type"),
]);
let mut rows = Vec::new();
let mut encoder = DataRowEncoder::new(schema.clone());
if let Some(collection) = on_collection {
let target = format!("collection:{}:{collection}", identity.tenant_id.as_u64());
if for_grantee.is_none()
&& let Some(owner) =
state
.permissions
.get_owner("collection", identity.tenant_id, collection)
{
encoder.encode_field(&owner)?;
encoder.encode_field(&"ALL (owner)")?;
encoder.encode_field(&collection)?;
encoder.encode_field(&"ownership")?;
rows.push(Ok(encoder.take_row()));
}
let grants = state.permissions.grants_on(&target);
for grant in &grants {
if let Some(g) = for_grantee
&& !grant.grantee.eq_ignore_ascii_case(g)
{
continue;
}
encoder.encode_field(&grant.grantee)?;
encoder.encode_field(&format!("{:?}", grant.permission))?;
encoder.encode_field(&collection)?;
encoder.encode_field(&"grant")?;
rows.push(Ok(encoder.take_row()));
}
} else if let Some(grantee) = for_grantee {
let grants = state.permissions.grants_for(grantee);
for grant in &grants {
let display_target = grant
.target
.rsplit(':')
.next()
.unwrap_or(&grant.target)
.to_string();
encoder.encode_field(&grant.grantee)?;
encoder.encode_field(&format!("{:?}", grant.permission))?;
encoder.encode_field(&display_target)?;
encoder.encode_field(&"grant")?;
rows.push(Ok(encoder.take_row()));
}
} else {
let all_grants = if identity.is_superuser
|| identity.has_role(&crate::control::security::identity::Role::TenantAdmin)
{
state.permissions.all_grants(identity.tenant_id)
} else {
state.permissions.grants_for(&identity.username)
};
for grant in &all_grants {
let display_target = grant
.target
.rsplit(':')
.next()
.unwrap_or(&grant.target)
.to_string();
encoder.encode_field(&grant.grantee)?;
encoder.encode_field(&format!("{:?}", grant.permission))?;
encoder.encode_field(&display_target)?;
encoder.encode_field(&"grant")?;
rows.push(Ok(encoder.take_row()));
}
}
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}