use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::catalog::SystemCatalog;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::types::{DatabaseId, TenantId};
use super::entry::SNAPSHOT_TIMEOUT;
use super::journal::{self, MovePhase, MoveTenantJournalEntry};
use super::{cutover, drain, snapshot};
use crate::control::server::pgwire::types::sqlstate_error;
use nodedb_types::error::sqlstate;
pub fn tenant_already_in_target(
catalog: &SystemCatalog,
_tenant_id: TenantId,
source_db_id: DatabaseId,
target_db_id: DatabaseId,
) -> crate::Result<bool> {
let source_colls = catalog.load_all_collections(source_db_id)?;
let active_in_source = source_colls.iter().any(|c| c.is_active);
if active_in_source {
return Ok(false);
}
let target_colls = catalog.load_all_collections(target_db_id)?;
Ok(target_colls.iter().any(|c| c.is_active))
}
pub async fn resume_or_compensate(
state: &SharedState,
catalog: &SystemCatalog,
entry: MoveTenantJournalEntry,
identity: &AuthenticatedIdentity,
) -> PgWireResult<Vec<Response>> {
let tenant_id = TenantId::new(entry.tenant_id);
let source_db_id = DatabaseId::new(entry.source_db_id);
let target_db_id = DatabaseId::new(entry.target_db_id);
match entry.phase {
MovePhase::Preflight | MovePhase::Drain => {
journal::delete_journal_entry_logged(catalog, tenant_id);
Err(sqlstate_error(
sqlstate::MOVE_TENANT_DRAIN_TIMEOUT,
&format!(
"MOVE TENANT '{}' was interrupted during drain and has been rolled back; \
please retry the operation",
entry.tenant_name
),
))
}
MovePhase::Snapshot => {
drain::release(state, tenant_id, source_db_id);
journal::delete_journal_entry_logged(catalog, tenant_id);
Err(sqlstate_error(
sqlstate::MOVE_TENANT_SNAPSHOT_FAILED,
&format!(
"MOVE TENANT '{}' was interrupted during snapshot and has been rolled back; \
please retry the operation",
entry.tenant_name
),
))
}
MovePhase::Cutover => {
let already_moved =
tenant_already_in_target(catalog, tenant_id, source_db_id, target_db_id)
.map_err(|e| sqlstate_error("XX000", &format!("idempotency check: {e}")))?;
if already_moved {
if let Some(ref key) = entry.temp_snapshot_key {
let _ = snapshot::delete_temp(state, key).await;
}
journal::delete_journal_entry_logged(catalog, tenant_id);
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!(
"MOVE TENANT {} FROM {} TO {} recovered (cutover was already complete)",
entry.tenant_name, entry.source_db_name, entry.target_db_name
),
);
return Ok(vec![Response::Execution(Tag::new("MOVE TENANT"))]);
}
let snapshot_result = snapshot::run(state, tenant_id, SNAPSHOT_TIMEOUT).await;
let snapshot_bytes = match snapshot_result {
Ok(b) => b,
Err(ref e) => {
drain::release(state, tenant_id, source_db_id);
journal::delete_journal_entry_logged(catalog, tenant_id);
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_SNAPSHOT_FAILED,
e.message(),
));
}
};
let cutover_result = cutover::run(
state,
catalog,
tenant_id,
source_db_id,
target_db_id,
&snapshot_bytes,
)
.await;
if let Err(ref e) = cutover_result {
drain::release(state, tenant_id, source_db_id);
if let Some(ref key) = entry.temp_snapshot_key {
let _ = snapshot::delete_temp(state, key).await;
}
journal::delete_journal_entry_logged(catalog, tenant_id);
return Err(sqlstate_error(
sqlstate::MOVE_TENANT_CUTOVER_FAILED,
e.message(),
));
}
if let Some(ref key) = entry.temp_snapshot_key {
let _ = snapshot::delete_temp(state, key).await;
}
journal::delete_journal_entry_logged(catalog, tenant_id);
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(tenant_id),
&identity.username,
&format!(
"MOVE TENANT {} FROM {} TO {} recovered (cutover re-applied)",
entry.tenant_name, entry.source_db_name, entry.target_db_name
),
);
Ok(vec![Response::Execution(Tag::new("MOVE TENANT"))])
}
MovePhase::Resumed => {
journal::delete_journal_entry_logged(catalog, tenant_id);
Ok(vec![Response::Execution(Tag::new("MOVE TENANT"))])
}
}
}
pub async fn recover_all(state: &SharedState) {
let catalog = match state.credentials.catalog().as_ref() {
Some(c) => c,
None => return,
};
let entries = match journal::scan_all_journal_entries(catalog) {
Ok(e) => e,
Err(err) => {
tracing::error!(
error = %err,
"move_tenant recovery: failed to scan journal; skipping"
);
return;
}
};
for entry in entries {
tracing::info!(
tenant = entry.tenant_id,
phase = ?entry.phase,
"move_tenant recovery: found in-progress entry"
);
let tenant_id = TenantId::new(entry.tenant_id);
let source_db_id = DatabaseId::new(entry.source_db_id);
let target_db_id = DatabaseId::new(entry.target_db_id);
match entry.phase {
MovePhase::Preflight | MovePhase::Drain | MovePhase::Snapshot => {
drain::release(state, tenant_id, source_db_id);
if let Some(ref key) = entry.temp_snapshot_key {
let _ = snapshot::delete_temp(state, key).await;
}
journal::delete_journal_entry_logged(catalog, tenant_id);
tracing::info!(
tenant = entry.tenant_id,
"move_tenant recovery: compensated (no data moved)"
);
}
MovePhase::Cutover => {
let already_moved = match catalog.load_all_collections(source_db_id) {
Ok(colls) => colls.iter().all(|col| !col.is_active),
Err(err) => {
tracing::warn!(
tenant = entry.tenant_id,
source_db = entry.source_db_id,
error = %err,
"move_tenant recovery: failed to read source collections; \
treating as not-moved and will retry cutover"
);
false
}
};
if already_moved {
tracing::info!(
tenant = entry.tenant_id,
"move_tenant recovery: cutover already complete; cleaning journal"
);
} else {
if let Ok(snap_bytes) = snapshot::run(state, tenant_id, SNAPSHOT_TIMEOUT).await
{
let _ = cutover::run(
state,
catalog,
tenant_id,
source_db_id,
target_db_id,
&snap_bytes,
)
.await;
}
drain::release(state, tenant_id, source_db_id);
}
if let Some(ref key) = entry.temp_snapshot_key {
let _ = snapshot::delete_temp(state, key).await;
}
journal::delete_journal_entry_logged(catalog, tenant_id);
}
MovePhase::Resumed => {
journal::delete_journal_entry_logged(catalog, tenant_id);
}
}
}
}