use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::audit::AuditEvent;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::types::sqlstate_error;
pub fn drop_collection(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
if_exists: bool,
purge: bool,
cascade: bool,
cascade_force: bool,
) -> PgWireResult<Vec<Response>> {
let name_lower = name.to_lowercase();
let name = name_lower.as_str();
let tenant_id = identity.tenant_id;
let dependents: Vec<crate::control::cascade::Dependent> = if let Some(catalog) =
state.credentials.catalog().as_ref()
{
let mut visited = std::collections::HashSet::new();
crate::control::cascade::collect_dependents(catalog, tenant_id.as_u64(), name, &mut visited)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?
} else {
Vec::new()
};
let blocking_dependents: Vec<&crate::control::cascade::Dependent> = dependents
.iter()
.filter(|d| d.kind != crate::control::cascade::DependentKind::Sequence)
.collect();
if !blocking_dependents.is_empty() && !cascade {
let deps_list: Vec<String> = blocking_dependents
.iter()
.map(|d| format!("{}:{}", d.kind.as_str(), d.name))
.collect();
return Err(sqlstate_error(
"2BP01",
&format!(
"cannot drop collection '{name}': {} dependent object(s) exist ({}); \
drop them individually or retry with CASCADE (batched-cascade propose \
not yet implemented — CASCADE currently rejected to avoid orphaned rows)",
blocking_dependents.len(),
deps_list.join(", ")
),
));
}
if cascade {
return Err(sqlstate_error(
"0A000",
"DROP COLLECTION ... CASCADE requires atomic batched Delete* + PurgeCollection \
in one metadata-raft commit — that proposer surface has not landed yet. \
Drop dependents individually in the meantime.",
));
}
let _ = cascade_force;
let is_owner = state
.permissions
.get_owner("collection", tenant_id, name)
.as_deref()
== Some(&identity.username);
let is_admin = identity.is_superuser
|| identity.has_role(&crate::control::security::identity::Role::TenantAdmin);
if !is_owner && !is_admin {
return Err(sqlstate_error(
"42501",
"permission denied: only owner, superuser, or tenant_admin can drop collections",
));
}
if purge && !is_admin {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser or tenant_admin may DROP COLLECTION ... PURGE",
));
}
if let Some(catalog) = state.credentials.catalog().as_ref() {
match catalog.get_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), name) {
Ok(Some(coll)) if coll.is_active => {}
Ok(Some(_)) if purge => {}
Ok(Some(_)) => {
return Ok(vec![Response::Execution(Tag::new("DROP COLLECTION"))]);
}
Ok(None) if purge || if_exists => {
return Ok(vec![Response::Execution(Tag::new("DROP COLLECTION"))]);
}
_ => {
return Err(sqlstate_error(
"42P01",
&format!("collection '{name}' does not exist"),
));
}
}
}
let action = if purge {
format!("requested purge of collection '{name}'")
} else {
format!("requested drop of collection '{name}'")
};
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&action,
);
let entry = if purge {
crate::control::catalog_entry::CatalogEntry::PurgeCollection {
tenant_id: tenant_id.as_u64(),
name: name.to_string(),
}
} else {
crate::control::catalog_entry::CatalogEntry::DeactivateCollection {
tenant_id: tenant_id.as_u64(),
name: name.to_string(),
}
};
let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
if log_index == 0
&& let Some(catalog) = state.credentials.catalog().as_ref()
{
if purge {
catalog
.delete_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), name)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
} else if let Ok(Some(mut coll)) =
catalog.get_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), name)
{
coll.is_active = false;
catalog
.put_collection(DatabaseId::DEFAULT, &coll)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
}
}
if let Some(catalog) = state.credentials.catalog().as_ref()
&& let Ok(seqs) = catalog.load_sequences_for_tenant(tenant_id.as_u64())
{
let prefix = format!("{name}_");
let suffix = "_seq";
for seq in &seqs {
if seq.name.starts_with(&prefix) && seq.name.ends_with(suffix) {
catalog
.delete_sequence(tenant_id.as_u64(), &seq.name)
.map_err(|e| {
sqlstate_error(
"XX000",
&format!("failed to drop sequence '{}': {e}", seq.name),
)
})?;
let _ = state
.sequence_registry
.remove(tenant_id.as_u64(), &seq.name);
}
}
}
let completion = if purge {
format!("purged collection '{name}' (log_index={log_index})")
} else {
format!("dropped collection '{name}' (log_index={log_index})")
};
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&completion,
);
Ok(vec![Response::Execution(Tag::new("DROP COLLECTION"))])
}