use nodedb_types::{DatabaseId, Lsn, MirrorMode, MirrorOrigin, MirrorStatus};
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::catalog::database_types::{DatabaseDescriptor, DatabaseStatus};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::super::types::{require_superuser, sqlstate_error};
pub fn handle_mirror_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
local_name: &str,
source_cluster: &str,
source_database: &str,
mode: MirrorMode,
) -> PgWireResult<Vec<Response>> {
require_superuser(state, identity, None, "MIRROR DATABASE")?;
let catalog = state.credentials.catalog();
let catalog = catalog
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
match catalog.get_database_id_by_name(local_name) {
Ok(Some(_)) => {
return Err(sqlstate_error(
"42P04",
&format!("database '{local_name}' already exists"),
));
}
Ok(None) => {}
Err(e) => {
return Err(sqlstate_error(
"XX000",
&format!("catalog lookup failed: {e}"),
));
}
}
let own_node_id = state.node_id;
if source_cluster.parse::<u64>().ok() == Some(own_node_id) {
return Err(sqlstate_error(
"0A000",
&format!(
"MIRROR DATABASE: source cluster '{source_cluster}' matches this node's id; \
self-mirroring is not supported"
),
));
}
let db_id = state.database_registry.alloc_one();
let created_at_lsn = state.wal.next_lsn().as_u64();
let source_db_id = DatabaseId::new(0);
let mirror_origin = MirrorOrigin {
source_cluster: source_cluster.to_string(),
source_database: source_db_id,
mode,
last_applied: Lsn::new(0),
status: MirrorStatus::Bootstrapping {
bytes_done: 0,
bytes_total: 0,
},
};
let descriptor = DatabaseDescriptor {
id: db_id,
name: local_name.to_string(),
status: DatabaseStatus::Mirroring,
created_at_lsn,
quota_ref: 0,
parent_clone: None,
mirror_origin: Some(mirror_origin),
audit_dml: nodedb_types::AuditDmlMode::None,
idle_session_timeout_secs: 0,
};
let proposed = propose_catalog_entry(
state,
&CatalogEntry::PutDatabase(Box::new(descriptor.clone())),
)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.put_database(&descriptor)
.map_err(|e| sqlstate_error("XX000", &format!("catalog write failed: {e}")))?;
}
if state.database_registry.should_flush() {
let hwm = state.database_registry.current_hwm();
if let Err(e) = catalog.put_database_hwm(hwm) {
tracing::warn!("database hwm flush failed after MIRROR DATABASE: {e}");
}
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseMirrored,
None,
Some(db_id),
&identity.username,
&format!(
"MIRROR DATABASE {local_name} FROM {source_cluster}.{source_database} MODE={mode:?}"
),
);
Ok(vec![Response::Execution(Tag::new("MIRROR DATABASE"))])
}