use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::audit::AuditEvent;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::server::pgwire::ddl::parse_utils::strip_if_exists;
use crate::control::server::pgwire::types::{require_tenant_admin, sqlstate_error};
use crate::control::state::SharedState;
pub fn drop_user(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
require_tenant_admin(identity, "drop users")?;
let (if_exists, parts) = strip_if_exists(parts, 2);
if parts.len() < 3 {
return Err(sqlstate_error(
"42601",
"syntax: DROP USER [IF EXISTS] <name>",
));
}
let username = parts[2];
if username == identity.username {
return Err(sqlstate_error("42501", "cannot drop your own user"));
}
let user_tenant = state
.credentials
.get_user(username)
.map(|u| u.tenant_id)
.unwrap_or(identity.tenant_id);
let exists_before = state.credentials.get_user(username).is_some();
if !exists_before {
if if_exists {
return Ok(vec![Response::Execution(Tag::new("DROP USER"))]);
}
return Err(sqlstate_error(
"42704",
&format!("user '{username}' does not exist"),
));
}
let entry = crate::control::catalog_entry::CatalogEntry::DropUser {
username: username.to_string(),
};
let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?;
let dropped = if log_index == 0 {
state
.credentials
.drop_user(username)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?
} else {
true
};
if dropped {
let admin_name = format!("{}_admin", user_tenant.as_u64());
let grants = state.permissions.grants_for(&format!("user:{username}"));
if let Some(catalog) = state.credentials.catalog() {
for grant in &grants {
let Some(owner_obj) = extract_collection_from_target(&grant.target) else {
continue;
};
if state
.permissions
.get_owner("collection", user_tenant, owner_obj)
.as_deref()
!= Some(username)
{
continue;
}
let mut stored = match catalog.get_collection(
DatabaseId::DEFAULT,
user_tenant.as_u64(),
owner_obj,
) {
Ok(Some(c)) => c,
_ => continue,
};
stored.owner = admin_name.clone();
let entry = crate::control::catalog_entry::CatalogEntry::PutCollection(Box::new(
stored.clone(),
));
if let Ok(idx) =
crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
&& idx == 0
{
let _ = catalog.put_collection(DatabaseId::DEFAULT, &stored);
state.permissions.install_replicated_owner(
&crate::control::security::catalog::StoredOwner {
object_type: "collection".into(),
object_name: stored.name.clone(),
tenant_id: stored.tenant_id,
owner_username: stored.owner.clone(),
},
);
}
}
}
state.audit_record(
AuditEvent::PrivilegeChange,
Some(identity.tenant_id),
&identity.username,
&format!("dropped user '{username}' (ownership reassigned to '{admin_name}')"),
);
Ok(vec![Response::Execution(Tag::new("DROP USER"))])
} else {
Err(sqlstate_error(
"42704",
&format!("user '{username}' does not exist"),
))
}
}
fn extract_collection_from_target(target: &str) -> Option<&str> {
let parts: Vec<&str> = target.splitn(3, ':').collect();
if parts.len() == 3 && parts[0] == "collection" {
Some(parts[2])
} else {
None
}
}