use std::time::Duration;
use bytes::Bytes;
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::MetaOp;
use crate::control::catalog_entry::CatalogEntry;
use crate::control::catalog_entry::apply::apply_to;
use crate::control::metadata_proposer::propose_catalog_entry;
use crate::control::planner::sql_plan_convert::convert::db_qualified;
use crate::control::security::catalog::{StoredCollection, SystemCatalog};
use crate::control::server::pgwire::ddl::sync_dispatch;
use crate::control::state::SharedState;
use crate::types::{DatabaseId, TenantId};
use nodedb_types::NodeDbError;
const RENAME_DISPATCH_TIMEOUT: Duration = Duration::from_secs(30);
pub async fn run(
state: &SharedState,
catalog: &SystemCatalog,
tenant_id: TenantId,
source_db_id: DatabaseId,
target_db_id: DatabaseId,
_snapshot_bytes: &Bytes,
) -> Result<(), NodeDbError> {
let collections: Vec<_> = catalog
.load_all_collections(source_db_id)
.map_err(|e| {
NodeDbError::move_tenant_cutover_failed(
tenant_id.as_u64().to_string(),
format!("failed to enumerate source collections: {e}"),
)
})?
.into_iter()
.filter(|c| c.is_active)
.collect();
let entry = CatalogEntry::MoveTenantCutover {
tenant_id: tenant_id.as_u64(),
source_db_id: source_db_id.as_u64(),
target_db_id: target_db_id.as_u64(),
collections: collections.clone(),
};
let proposed = propose_catalog_entry(state, &entry).map_err(|e| {
NodeDbError::move_tenant_cutover_failed(
tenant_id.as_u64().to_string(),
format!("Raft proposal failed: {e}"),
)
})?;
if proposed == 0 {
let catalog_arc = state.credentials.catalog();
let catalog = catalog_arc.as_ref().ok_or_else(|| {
NodeDbError::move_tenant_cutover_failed(
tenant_id.as_u64().to_string(),
"system catalog unavailable for direct apply".to_string(),
)
})?;
apply_to(&entry, catalog);
}
dispatch_rename_ops(state, tenant_id, source_db_id, target_db_id, &collections).await?;
Ok(())
}
async fn dispatch_rename_ops(
state: &SharedState,
tenant_id: TenantId,
source_db_id: DatabaseId,
target_db_id: DatabaseId,
collections: &[StoredCollection],
) -> Result<(), NodeDbError> {
for coll in collections {
let old_collection = db_qualified(source_db_id, &coll.name);
let new_collection = db_qualified(target_db_id, &coll.name);
let plan = PhysicalPlan::Meta(MetaOp::RenameCollection {
tenant_id: coll.tenant_id,
old_collection: old_collection.clone(),
new_collection: new_collection.clone(),
});
sync_dispatch::dispatch_async(state, tenant_id, "__system", plan, RENAME_DISPATCH_TIMEOUT)
.await
.map_err(|e| {
NodeDbError::move_tenant_cutover_failed(
tenant_id.as_u64().to_string(),
format!(
"rename_collection dispatch ({old_collection} -> {new_collection}): {e}"
),
)
})?;
}
Ok(())
}