use nodedb_sql::ddl_ast::AlterDatabaseOperation;
use nodedb_types::QuotaRecord;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::types::{require_cluster_admin, require_database_owner, sqlstate_error};
pub fn handle_alter_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
operation: &AlterDatabaseOperation,
) -> PgWireResult<Vec<Response>> {
let catalog = match state.credentials.catalog() {
Some(c) => c,
None => {
return Err(sqlstate_error("XX000", "system catalog unavailable"));
}
};
let db_id = catalog
.get_database_id_by_name(name)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup failed: {e}")))?
.ok_or_else(|| sqlstate_error("3D000", &format!("database '{name}' does not exist")))?;
let mut descriptor = catalog
.get_database(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog read failed: {e}")))?
.ok_or_else(|| sqlstate_error("XX000", &format!("database '{name}' descriptor missing")))?;
match operation {
AlterDatabaseOperation::Rename { new_name } => {
require_database_owner(
state,
identity,
db_id,
&format!("ALTER DATABASE {name} RENAME"),
)?;
match catalog.get_database_id_by_name(new_name) {
Ok(Some(existing_id)) if existing_id != db_id => {
return Err(sqlstate_error(
"42P04",
&format!("database '{new_name}' already exists"),
));
}
Ok(_) => {}
Err(e) => {
return Err(sqlstate_error(
"XX000",
&format!("catalog lookup failed: {e}"),
));
}
}
descriptor.name = new_name.clone();
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseRenamed,
None,
Some(db_id),
&identity.username,
&format!("ALTER DATABASE {name} RENAME TO {new_name}"),
);
}
AlterDatabaseOperation::SetQuota(spec) => {
require_cluster_admin(
state,
identity,
Some(db_id),
&format!("ALTER DATABASE {name} SET QUOTA"),
)?;
let before = catalog
.get_database_quota(db_id)
.map_err(|e| sqlstate_error("XX000", &format!("quota read failed: {e}")))?
.unwrap_or(QuotaRecord::DEFAULT);
let mut record = before.clone();
record.merge(spec);
let ceiling = state.quota_ceiling_snapshot();
catalog
.put_database_quota(db_id, &record, &ceiling)
.map_err(|e| sqlstate_error("53400", &format!("{e}")))?;
state
.maintenance_budget
.set_cap(db_id, record.maintenance_cpu_pct);
if let Some(ref gov) = state.governor {
if record.max_memory_bytes > 0 {
gov.set_database_budget(db_id, record.max_memory_bytes as usize);
} else {
gov.clear_database_budget(db_id);
}
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseQuotaChanged,
None,
Some(db_id),
&identity.username,
&format!(
"ALTER DATABASE {name} SET QUOTA — before: [{}] — after: [{}]",
before.audit_summary(),
record.audit_summary()
),
);
}
AlterDatabaseOperation::SetDefault => {
require_cluster_admin(
state,
identity,
Some(db_id),
&format!("ALTER DATABASE {name} SET DEFAULT"),
)?;
return Err(sqlstate_error(
"0A000",
"ALTER DATABASE SET DEFAULT is not yet implemented; \
use ALTER USER <name> SET DEFAULT DATABASE <db>",
));
}
AlterDatabaseOperation::SetAuditDml(mode) => {
require_cluster_admin(
state,
identity,
Some(db_id),
&format!("ALTER DATABASE {name} SET AUDIT_DML"),
)?;
descriptor.audit_dml = *mode;
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
state.audit_dml_cache.set(db_id, *mode);
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseAuditDmlChanged,
None,
Some(db_id),
&identity.username,
&format!("ALTER DATABASE {name} SET AUDIT_DML = {mode}",),
);
}
AlterDatabaseOperation::SetIdleTimeout(secs) => {
require_cluster_admin(
state,
identity,
Some(db_id),
&format!("ALTER DATABASE {name} SET IDLE_TIMEOUT"),
)?;
let before = descriptor.idle_session_timeout_secs;
descriptor.idle_session_timeout_secs = *secs;
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
state.idle_timeout_cache.set(db_id, *secs);
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseIdleTimeoutChanged,
None,
Some(db_id),
&identity.username,
&format!("ALTER DATABASE {name} SET IDLE_TIMEOUT = {secs} (was {before})"),
);
}
AlterDatabaseOperation::Materialize => {
return super::materialize::handle_alter_database_materialize(state, identity, name);
}
AlterDatabaseOperation::Promote => {
return super::mirror::promote::handle_promote_database(state, identity, name);
}
}
Ok(vec![Response::Execution(Tag::new("ALTER DATABASE"))])
}