use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::audit::{AuditEvent, UndropAuditDetail, UndropStage};
use crate::control::security::identity::{AuthenticatedIdentity, Role};
use crate::control::state::SharedState;
use super::super::super::types::sqlstate_error;
pub fn undrop_collection(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.len() < 3 {
return Err(sqlstate_error("42601", "syntax: UNDROP COLLECTION <name>"));
}
let name_lower = parts[2].to_lowercase();
let name = name_lower.as_str();
let tenant_id = identity.tenant_id;
let Some(catalog) = state.credentials.catalog() else {
return Err(sqlstate_error(
"XX000",
"UNDROP COLLECTION requires a persistent system catalog",
));
};
let mut stored = match catalog.get_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), name) {
Ok(Some(c)) => c,
Ok(None) => {
return Err(sqlstate_error(
"42P01",
&format!(
"collection '{name}' not found (retention window elapsed or never existed)"
),
));
}
Err(e) => return Err(sqlstate_error("XX000", &e.to_string())),
};
if stored.is_active {
return Err(sqlstate_error(
"42P07",
&format!("collection '{name}' is already active"),
));
}
let preserved_owner = state.permissions.get_owner("collection", tenant_id, name);
let is_preserved_owner = preserved_owner.as_deref() == Some(&identity.username);
let is_admin = identity.is_superuser || identity.has_role(&Role::TenantAdmin);
if !is_preserved_owner && !is_admin {
return Err(sqlstate_error(
"42501",
"permission denied: only the preserved owner, superuser, or tenant_admin may UNDROP",
));
}
let owner_user_missing = preserved_owner
.as_deref()
.is_some_and(|u| state.credentials.get_user(u).is_none());
if owner_user_missing && !is_admin {
return Err(sqlstate_error(
"42501",
"preserved-owner user no longer exists — only superuser or tenant_admin may UNDROP",
));
}
let intent = UndropAuditDetail::new(name, UndropStage::Requested, owner_user_missing).to_json();
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&intent,
);
stored.is_active = true;
let entry =
crate::control::catalog_entry::CatalogEntry::PutCollection(Box::new(stored.clone()));
let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
if log_index == 0 {
catalog
.put_collection(DatabaseId::DEFAULT, &stored)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
}
let completion = UndropAuditDetail::new(name, UndropStage::Completed, owner_user_missing)
.with_log_index(log_index)
.to_json();
state.audit_record(
AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&completion,
);
Ok(vec![Response::Execution(Tag::new("UNDROP COLLECTION"))])
}