use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use nodedb_sql::ddl_ast::CloneAsOf;
use nodedb_types::{DatabaseId, MAX_CLONE_DEPTH};
use crate::control::catalog_entry::entry::CatalogEntry;
use crate::control::clone::lsn_resolve::wall_ms_to_lsn;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::security::catalog::database_types::{
DatabaseDescriptor, DatabaseStatus, ParentCloneRef,
};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::super::types::{require_superuser, sqlstate_error};
pub struct CloneDatabaseParams<'a> {
pub new_name: &'a str,
pub source_name: &'a str,
pub as_of: &'a CloneAsOf,
}
pub fn handle_clone_database(
state: &SharedState,
identity: &AuthenticatedIdentity,
params: CloneDatabaseParams<'_>,
) -> PgWireResult<Vec<Response>> {
let catalog = state.credentials.catalog();
let catalog = catalog
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
let source_db_id = catalog
.get_database_id_by_name(params.source_name)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup failed: {e}")))?
.ok_or_else(|| {
sqlstate_error(
"42P01",
&format!("source database '{}' not found", params.source_name),
)
})?;
require_superuser(state, identity, Some(source_db_id), "CLONE DATABASE")?;
let source_descriptor = catalog
.get_database(source_db_id)
.map_err(|e| sqlstate_error("XX000", &format!("catalog read failed: {e}")))?
.ok_or_else(|| {
sqlstate_error(
"42P01",
&format!(
"source database '{}' descriptor missing",
params.source_name
),
)
})?;
if is_mirror_database(&source_descriptor) {
return Err(sqlstate_error(
nodedb_types::error::sqlstate::CANNOT_CLONE_MIRROR,
&format!(
"database '{}' is a mirror and cannot be cloned; \
promote it with ALTER DATABASE {} PROMOTE first",
params.source_name, params.source_name,
),
));
}
let depth = clone_chain_depth(state, source_db_id)
.map_err(|e| sqlstate_error("XX000", &format!("clone depth check failed: {e}")))?;
if depth >= MAX_CLONE_DEPTH {
return Err(sqlstate_error(
nodedb_types::error::sqlstate::CLONE_DEPTH_EXCEEDED,
&format!(
"clone chain depth {} equals the maximum of {}; \
materialize a clone to flatten the chain before cloning again",
depth, MAX_CLONE_DEPTH,
),
));
}
match catalog.get_database_id_by_name(params.new_name) {
Ok(Some(_)) => {
return Err(sqlstate_error(
"42P04",
&format!("database '{}' already exists", params.new_name),
));
}
Ok(None) => {}
Err(e) => {
return Err(sqlstate_error(
"XX000",
&format!("catalog lookup failed: {e}"),
));
}
}
let now_ms = current_wall_ms()
.map_err(|e| sqlstate_error("XX000", &format!("clock read failed: {e}")))?;
let (as_of_lsn, as_of_ms) = match params.as_of {
CloneAsOf::Latest => (state.wal.next_lsn(), now_ms),
CloneAsOf::SystemTimeMs(ms) => {
let lsn = wall_ms_to_lsn(state, *ms);
(lsn, *ms)
}
};
let clone_created_at = state.wal.next_lsn();
let target_db_id = state.database_registry.alloc_one();
let target_descriptor = DatabaseDescriptor {
id: target_db_id,
name: params.new_name.to_string(),
status: DatabaseStatus::Cloning,
created_at_lsn: clone_created_at.as_u64(),
quota_ref: source_descriptor.quota_ref,
parent_clone: Some(ParentCloneRef {
source_db_id,
as_of_lsn: as_of_lsn.as_u64(),
as_of_ms: as_of_ms as u64,
kv_surrogate_ceiling: Some(state.surrogate_assigner.current_hwm()),
}),
mirror_origin: None,
audit_dml: nodedb_types::AuditDmlMode::None,
idle_session_timeout_secs: 0,
};
let entry = CatalogEntry::CloneDatabase {
target_descriptor: Box::new(target_descriptor.clone()),
source_db_id: source_db_id.as_u64(),
};
let proposed = propose_catalog_entry(state, &entry)
.map_err(|e| sqlstate_error("XX000", &format!("catalog propose failed: {e}")))?;
if proposed == 0 {
catalog
.add_clone_child(source_db_id, target_db_id)
.map_err(|e| sqlstate_error("XX000", &format!("lineage write failed: {e}")))?;
if let Err(put_err) = catalog.put_database(&target_descriptor) {
if let Err(rb_err) = catalog.remove_clone_child(source_db_id, target_db_id) {
return Err(sqlstate_error(
"XX000",
&format!(
"catalog write failed: {put_err}; \
lineage rollback ALSO failed: {rb_err} — \
catalog left with orphan lineage edge \
(source={source_db_id}, target={target_db_id})",
),
));
}
return Err(sqlstate_error(
"XX000",
&format!("catalog write failed: {put_err}"),
));
}
let source_colls = catalog.load_all_collections(source_db_id).map_err(|e| {
sqlstate_error(
"XX000",
&format!("clone: enumerate source collections: {e}"),
)
})?;
let kv_surrogate_ceiling = Some(state.surrogate_assigner.current_hwm());
for mut coll in source_colls.into_iter().filter(|c| c.is_active) {
coll.database_id = target_db_id;
coll.cloned_from = Some(nodedb_types::CloneOrigin {
source_database: source_db_id,
source_collection: coll.name.clone(),
as_of_lsn,
clone_created_at,
kv_surrogate_ceiling,
});
coll.clone_status = nodedb_types::CloneStatus::Shadowed;
coll.descriptor_version = 0;
if let Err(e) = catalog.put_collection(target_db_id, &coll) {
tracing::warn!(
target_db_id = target_db_id.as_u64(),
collection = %coll.name,
error = %e,
"clone: failed to stamp shadow collection descriptor"
);
}
}
}
if state.database_registry.should_flush() {
let hwm = state.database_registry.current_hwm();
if let Err(e) = catalog.put_database_hwm(hwm) {
tracing::warn!("database hwm flush failed after clone: {e}");
}
}
state.audit_record_with_db(
crate::control::security::audit::AuditEvent::DatabaseCloned,
None,
Some(target_db_id),
&identity.username,
&format!(
"CLONE DATABASE {} FROM {} AS OF SYSTEM TIME {}",
params.new_name, params.source_name, as_of_ms
),
);
Ok(vec![Response::Execution(Tag::new("CLONE DATABASE"))])
}
fn is_mirror_database(descriptor: &DatabaseDescriptor) -> bool {
matches!(descriptor.status, DatabaseStatus::Mirroring)
}
fn clone_chain_depth(state: &SharedState, start_db_id: DatabaseId) -> crate::Result<u32> {
let catalog = state.credentials.catalog();
let catalog = catalog.as_ref().ok_or(crate::Error::Storage {
engine: "catalog".into(),
detail: "system catalog unavailable for depth check".into(),
})?;
let mut current = start_db_id;
let mut depth: u32 = 0;
loop {
if depth > MAX_CLONE_DEPTH {
return Ok(depth);
}
let desc = catalog
.get_database(current)
.map_err(|e| crate::Error::Storage {
engine: "catalog".into(),
detail: format!("depth walk get_database failed: {e}"),
})?;
match desc.and_then(|d| d.parent_clone) {
None => return Ok(depth),
Some(parent) => {
current = parent.source_db_id;
depth += 1;
}
}
}
}
fn current_wall_ms() -> crate::Result<i64> {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.map_err(|e| crate::Error::Internal {
detail: format!("clone_database: system clock predates Unix epoch: {e}"),
})
}