use nodedb_types::DatabaseId;
use nodedb_types::MirrorStatus;
use nodedb_types::error::sqlstate;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::maintenance::clone_materializer::{
CloneMaterializerHandle, force_materialize_blocking,
};
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::catalog::{StoredCollection, SystemCatalog};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::types::{require_superuser, sqlstate_error};
pub fn handle_drop_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
if_exists: bool,
cascade: bool,
) -> PgWireResult<Vec<Response>> {
if name.eq_ignore_ascii_case("default") {
return Err(sqlstate_error(
sqlstate::CANNOT_DROP_DEFAULT_DATABASE,
"cannot drop the built-in 'default' database",
));
}
let catalog = state.credentials.catalog();
let catalog = catalog
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
let db_id = match catalog
.get_database_id_by_name(name)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup failed: {e}")))?
{
Some(id) => id,
None => {
if if_exists {
return Ok(vec![Response::Execution(Tag::new("DROP DATABASE"))]);
}
return Err(sqlstate_error(
"3D000",
&format!("database '{name}' does not exist"),
));
}
};
require_superuser(
state,
identity,
Some(db_id),
&format!("DROP DATABASE {name}"),
)?;
if db_id == DatabaseId::DEFAULT {
return Err(sqlstate_error(
sqlstate::CANNOT_DROP_DEFAULT_DATABASE,
"cannot drop the built-in 'default' database",
));
}
{
let descriptor_for_mirror = catalog
.get_database(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog read failed: {e}")))?;
if let Some(descriptor) = descriptor_for_mirror
&& let Some(origin) = descriptor.mirror_origin.as_ref()
&& !matches!(origin.status, MirrorStatus::Promoted)
{
if let Err(e) = catalog.delete_mirror_collection_map(db_id) {
tracing::warn!(
db = ?db_id, "DROP DATABASE mirror: failed to remove collection map: {e}"
);
}
if let Err(e) = catalog.delete_mirror_lag(db_id) {
tracing::warn!(
db = ?db_id, "DROP DATABASE mirror: failed to remove lag record: {e}"
);
}
tracing::info!(
db = ?db_id,
source_cluster = %origin.source_cluster,
"DROP DATABASE mirror: observer subscription teardown complete"
);
}
}
let dependent_ids = catalog
.get_clone_children(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("lineage check failed: {e}")))?;
if !dependent_ids.is_empty() {
if !cascade {
let id_list: Vec<String> = dependent_ids
.iter()
.map(|id| id.as_u64().to_string())
.collect();
return Err(sqlstate_error(
sqlstate::CLONE_DEPENDENCY,
&format!(
"database '{}' cannot be dropped: {} clone(s) depend on it \
(database ids: {}); use FORCE or CASCADE to materialize them first",
name,
dependent_ids.len(),
id_list.join(", ")
),
));
}
for dep_id in &dependent_ids {
let handle = CloneMaterializerHandle::new(*dep_id);
force_materialize_blocking(*dep_id, state, catalog, Some(&handle)).map_err(
|e| match e {
crate::Error::BadRequest { detail } => sqlstate_error("0A000", &detail),
other => sqlstate_error(
"XX000",
&format!(
"force materialization of dependent clone {} failed: {other}",
dep_id.as_u64()
),
),
},
)?;
}
}
let collections = catalog
.load_all_collections(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog scan failed: {e}")))?;
if !cascade && !collections.is_empty() {
return Err(sqlstate_error(
"2BP01",
&format!(
"database '{name}' has {} collection(s); \
use CASCADE to drop all collections automatically",
collections.len()
),
));
}
if cascade {
drop_all_collections_in_database(catalog, db_id, &collections)?;
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseDropped,
None,
Some(db_id),
&identity.username,
&format!("DROP DATABASE {name}"),
);
let proposed = propose_catalog_entry(
state,
&CatalogEntry::DeleteDatabase {
db_id: db_id.as_u64(),
},
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.delete_database(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog delete failed: {e}")))?;
}
if let Some(m) = &state.system_metrics {
if let Ok(mut map) = m.database_collections_by_name.write() {
map.remove(name);
}
if let Ok(mut map) = m.database_queries_by_name.write() {
map.remove(name);
}
if let Ok(mut map) = m.database_errors_by_name.write() {
map.remove(name);
}
}
Ok(vec![Response::Execution(Tag::new("DROP DATABASE"))])
}
fn drop_all_collections_in_database(
catalog: &SystemCatalog,
db_id: DatabaseId,
collections: &[StoredCollection],
) -> PgWireResult<()> {
for coll in collections {
catalog
.delete_collection(db_id, coll.tenant_id, &coll.name)
.map_err(|e| {
sqlstate_error(
"XX000",
&format!(
"CASCADE DROP DATABASE {}: failed to delete collection '{}': {e}",
db_id.as_u64(),
coll.name
),
)
})?;
}
Ok(())
}