use nodedb_types::MirrorStatus;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::catalog::database_types::DatabaseStatus;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::super::types::{require_superuser, sqlstate_error};
pub fn handle_promote_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
) -> PgWireResult<Vec<Response>> {
let catalog = state.credentials.catalog();
let catalog = catalog
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
let db_id = catalog
.get_database_id_by_name(name)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup failed: {e}")))?
.ok_or_else(|| sqlstate_error("3D000", &format!("database '{name}' does not exist")))?;
require_superuser(
state,
identity,
Some(db_id),
&format!("ALTER DATABASE {name} PROMOTE"),
)?;
let mut descriptor = catalog
.get_database(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog read failed: {e}")))?
.ok_or_else(|| sqlstate_error("XX000", &format!("database '{name}' descriptor missing")))?;
let already_promoted = match &descriptor.mirror_origin {
Some(origin) => matches!(origin.status, MirrorStatus::Promoted),
None => descriptor.status == DatabaseStatus::Active,
};
if already_promoted {
return Ok(vec![Response::Execution(Tag::new("ALTER DATABASE"))]);
}
if descriptor.mirror_origin.is_none() && descriptor.status != DatabaseStatus::Mirroring {
return Err(sqlstate_error(
"0A000",
&format!("database '{name}' is not a mirror database"),
));
}
state.mirror_link_registry.teardown_link(db_id);
if let Some(ref mut origin) = descriptor.mirror_origin {
origin.status = MirrorStatus::Promoted;
}
descriptor.status = DatabaseStatus::Active;
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
if let Err(e) = catalog.delete_mirror_collection_map(db_id) {
return Err(sqlstate_error(
"XX000",
&format!("PROMOTE: failed to clear mirror_collection_map: {e}"),
));
}
if let Err(e) = catalog.delete_mirror_lag(db_id) {
return Err(sqlstate_error(
"XX000",
&format!("PROMOTE: failed to clear mirror_lag: {e}"),
));
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabasePromoted,
None,
Some(db_id),
&identity.username,
&format!("ALTER DATABASE {name} PROMOTE"),
);
Ok(vec![Response::Execution(Tag::new("ALTER DATABASE"))])
}