use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::catalog::database_types::{DatabaseDescriptor, DatabaseStatus};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::types::{require_cluster_admin, sqlstate_error};
#[derive(Debug, Default)]
struct CreateDatabaseOptions {
quota_id: u64,
}
fn parse_create_options(options: &[(String, String)]) -> PgWireResult<CreateDatabaseOptions> {
let mut out = CreateDatabaseOptions::default();
for (k, v) in options {
match k.to_ascii_lowercase().as_str() {
"quota_id" | "quota" => {
out.quota_id = v.parse::<u64>().map_err(|_| {
sqlstate_error(
"22023",
&format!("CREATE DATABASE: invalid {k}='{v}' (expected unsigned integer)"),
)
})?;
}
other => {
return Err(sqlstate_error(
"0A000",
&format!("CREATE DATABASE: unsupported WITH option '{other}'"),
));
}
}
}
Ok(out)
}
pub fn handle_create_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
name: &str,
if_not_exists: bool,
options: &[(String, String)],
) -> PgWireResult<Vec<Response>> {
require_cluster_admin(state, identity, None, &format!("CREATE DATABASE {name}"))?;
let opts = parse_create_options(options)?;
let catalog = state.credentials.catalog();
let catalog = catalog
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
match catalog.get_database_id_by_name(name) {
Ok(Some(_)) => {
if if_not_exists {
return Ok(vec![Response::Execution(Tag::new("CREATE DATABASE"))]);
}
return Err(sqlstate_error(
"42P04",
&format!("database '{name}' already exists"),
));
}
Ok(None) => {}
Err(e) => {
return Err(sqlstate_error(
"XX000",
&format!("catalog lookup failed: {e}"),
));
}
}
let db_id = state.database_registry.alloc_one();
let created_at_lsn = state.wal.next_lsn().as_u64();
let descriptor = DatabaseDescriptor {
id: db_id,
name: name.to_string(),
status: DatabaseStatus::Active,
created_at_lsn,
quota_ref: opts.quota_id,
parent_clone: None,
mirror_origin: None,
audit_dml: nodedb_types::AuditDmlMode::None,
idle_session_timeout_secs: 0,
};
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
if state.database_registry.should_flush() {
let hwm = state.database_registry.current_hwm();
if let Err(e) = catalog.put_database_hwm(hwm) {
tracing::warn!("database hwm flush failed: {e}");
}
}
if let Some(m) = &state.system_metrics {
m.set_database_collections(name, 0);
m.set_database_tenants(name, 0);
m.set_database_memory_bytes(name, 0);
m.set_database_storage_bytes(name, 0);
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseCreated,
None,
Some(db_id),
&identity.username,
&format!("CREATE DATABASE {name}"),
);
Ok(vec![Response::Execution(Tag::new("CREATE DATABASE"))])
}