use std::time::Duration;
use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::server::pgwire::types::{require_superuser, sqlstate_error};
use crate::control::state::SharedState;
use crate::types::TenantId;
use nodedb_types::error::sqlstate;
use super::journal::{self, MovePhase, MoveTenantJournalEntry};
use super::{cutover, drain, preflight, recovery, snapshot};
const DRAIN_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(120);
pub async fn handle_move_tenant(
state: &SharedState,
identity: &AuthenticatedIdentity,
tenant_name: &str,
from_db: &str,
to_db: &str,
) -> PgWireResult<Vec<Response>> {
let catalog = state
.credentials
.catalog()
.as_ref()
.ok_or_else(|| sqlstate_error("XX000", "system catalog unavailable"))?;
let source_db_id = catalog
.get_database_id_by_name(from_db)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup: {e}")))?
.ok_or_else(|| {
sqlstate_error("42P01", &format!("source database '{from_db}' not found"))
})?;
require_superuser(
state,
identity,
Some(source_db_id),
&format!("MOVE TENANT {tenant_name} FROM {from_db} TO {to_db}"),
)?;
let tenant_record = catalog
.load_all_tenants()
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup: {e}")))?
.into_iter()
.find(|t| t.name == tenant_name)
.ok_or_else(|| sqlstate_error("42P01", &format!("tenant '{tenant_name}' not found")))?;
let tenant_id = TenantId::new(tenant_record.tenant_id);
let target_db_id = catalog
.get_database_id_by_name(to_db)
.map_err(|e| sqlstate_error("XX000", &format!("catalog lookup: {e}")))?
.ok_or_else(|| sqlstate_error("42P01", &format!("target database '{to_db}' not found")))?;
if recovery::tenant_already_in_target(catalog, tenant_id, source_db_id, target_db_id)
.map_err(|e| sqlstate_error("XX000", &format!("idempotency check: {e}")))?
{
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_ALREADY_AT_TARGET,
nodedb_types::NodeDbError::move_tenant_already_at_target(tenant_name, to_db).message(),
));
}
if let Some(entry) = journal::load_journal_entry(catalog, tenant_id)
.map_err(|e| sqlstate_error("XX000", &format!("journal read: {e}")))?
{
return recovery::resume_or_compensate(state, catalog, entry, identity).await;
}
preflight::run(catalog, source_db_id, target_db_id, tenant_name, to_db)
.map_err(|e| sqlstate_error(sqlstate::MOVE_TENANT_PREFLIGHT_FAILED, e.message()))?;
let journal_entry = MoveTenantJournalEntry {
tenant_id: tenant_id.as_u64(),
tenant_name: tenant_name.to_string(),
source_db_id: source_db_id.as_u64(),
source_db_name: from_db.to_string(),
target_db_id: target_db_id.as_u64(),
target_db_name: to_db.to_string(),
phase: MovePhase::Drain,
last_durable_lsn: state.wal.next_lsn().as_u64(),
temp_snapshot_key: None,
};
journal::save_journal_entry(catalog, &journal_entry)
.map_err(|e| sqlstate_error("XX000", &format!("journal write: {e}")))?;
let drain_result = drain::run(state, tenant_id, source_db_id, DRAIN_TIMEOUT).await;
if let Err(ref e) = drain_result {
journal::delete_journal_entry_logged(catalog, tenant_id);
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_DRAIN_TIMEOUT,
e.message(),
));
}
let journal_entry = journal_entry.with_phase(MovePhase::Snapshot);
journal::save_journal_entry(catalog, &journal_entry)
.map_err(|e| sqlstate_error("XX000", &format!("journal update: {e}")))?;
let snapshot_result = snapshot::run(state, tenant_id, SNAPSHOT_TIMEOUT).await;
let snapshot_bytes = match snapshot_result {
Ok(bytes) => bytes,
Err(ref e) => {
drain::release(state, tenant_id, source_db_id);
journal::delete_journal_entry_logged(catalog, tenant_id);
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_SNAPSHOT_FAILED,
e.message(),
));
}
};
let temp_key = snapshot::temp_key(tenant_id);
let journal_entry = journal_entry
.with_phase(MovePhase::Cutover)
.with_temp_snapshot_key(temp_key.clone());
journal::save_journal_entry(catalog, &journal_entry)
.map_err(|e| sqlstate_error("XX000", &format!("journal update: {e}")))?;
let cutover_result = cutover::run(
state,
catalog,
tenant_id,
source_db_id,
target_db_id,
&snapshot_bytes,
)
.await;
if let Err(ref e) = cutover_result {
drain::release(state, tenant_id, source_db_id);
let _ = snapshot::delete_temp(state, &temp_key).await;
journal::delete_journal_entry_logged(catalog, tenant_id);
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_CUTOVER_FAILED,
e.message(),
));
}
let _ = snapshot::delete_temp(state, &temp_key).await;
journal::delete_journal_entry_logged(catalog, tenant_id);
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!("MOVE TENANT {tenant_name} FROM {from_db} TO {to_db} completed"),
);
Ok(vec![Response::Execution(Tag::new("MOVE TENANT"))])
}