use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::audit::AuditEvent;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::types::TenantId;
use super::super::types::{int8_field, sqlstate_error, text_field};
use super::user::extract_quoted_string;
#[derive(serde::Serialize, serde::Deserialize)]
struct TenantBackup {
version: u32,
tenant_id: u32,
created_at: u64,
documents: Vec<(String, Vec<u8>)>,
indexes: Vec<(String, Vec<u8>)>,
crdt_snapshots: Vec<(String, Vec<u8>)>,
vector_snapshots: Vec<(String, Vec<u8>)>,
}
pub fn backup_tenant(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can backup tenants",
));
}
if parts.len() < 5 {
return Err(sqlstate_error(
"42601",
"syntax: BACKUP TENANT <id> TO '<path>'",
));
}
let tid: u32 = parts[2]
.parse()
.map_err(|_| sqlstate_error("42601", "TENANT ID must be a numeric value"))?;
if !parts[3].eq_ignore_ascii_case("TO") {
return Err(sqlstate_error("42601", "expected TO after tenant ID"));
}
let path = extract_quoted_string(parts, 4)
.ok_or_else(|| sqlstate_error("42601", "path must be a single-quoted string"))?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let backup = TenantBackup {
version: 1,
tenant_id: tid,
created_at: now,
documents: Vec::new(),
indexes: Vec::new(),
crdt_snapshots: Vec::new(),
vector_snapshots: Vec::new(),
};
let catalog_data = if let Some(catalog) = state.credentials.catalog() {
let collections = catalog.load_collections_for_tenant(tid).unwrap_or_default();
let users = state
.credentials
.list_user_details()
.into_iter()
.filter(|u| u.tenant_id.as_u32() == tid)
.count();
(collections.len(), users)
} else {
(0, 0)
};
let plaintext = rmp_serde::to_vec(&backup)
.map_err(|e| sqlstate_error("XX000", &format!("backup serialization failed: {e}")))?;
let bytes = if let Some(key) = state.wal.encryption_key() {
let mut aad = [0u8; nodedb_wal::record::HEADER_SIZE];
aad[..6].copy_from_slice(b"BACKUP");
let encrypted = key
.encrypt(0, &aad, &plaintext)
.map_err(|e| sqlstate_error("XX000", &format!("backup encryption failed: {e}")))?;
let mut output = Vec::with_capacity(4 + encrypted.len());
output.extend_from_slice(b"NENC"); output.extend_from_slice(&encrypted);
output
} else {
plaintext
};
std::fs::write(&path, &bytes).map_err(|e| {
sqlstate_error("XX000", &format!("failed to write backup to '{path}': {e}"))
})?;
state.audit_record(
AuditEvent::AdminAction,
Some(TenantId::new(tid)),
&identity.username,
&format!(
"backup tenant {tid} to '{path}' ({} bytes, {} collections, {} users)",
bytes.len(),
catalog_data.0,
catalog_data.1
),
);
let schema = Arc::new(vec![
text_field("path"),
int8_field("size_bytes"),
int8_field("collections"),
]);
let mut encoder = DataRowEncoder::new(schema.clone());
encoder
.encode_field(&path)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&(bytes.len() as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
encoder
.encode_field(&(catalog_data.0 as i64))
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
let row = encoder.take_row();
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(vec![Ok(row)]),
))])
}
pub fn restore_tenant(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can restore tenants",
));
}
if parts.len() < 5 {
return Err(sqlstate_error(
"42601",
"syntax: RESTORE TENANT <id> FROM '<path>'",
));
}
let tid: u32 = parts[2]
.parse()
.map_err(|_| sqlstate_error("42601", "TENANT ID must be a numeric value"))?;
if !parts[3].eq_ignore_ascii_case("FROM") {
return Err(sqlstate_error("42601", "expected FROM after tenant ID"));
}
let path = extract_quoted_string(parts, 4)
.ok_or_else(|| sqlstate_error("42601", "path must be a single-quoted string"))?;
let bytes = std::fs::read(&path).map_err(|e| {
sqlstate_error(
"XX000",
&format!("failed to read backup from '{path}': {e}"),
)
})?;
let backup: TenantBackup = rmp_serde::from_slice(&bytes)
.map_err(|e| sqlstate_error("XX000", &format!("backup deserialization failed: {e}")))?;
if backup.version != 1 {
return Err(sqlstate_error(
"XX000",
&format!("unsupported backup version: {}", backup.version),
));
}
if backup.tenant_id != tid {
return Err(sqlstate_error(
"XX000",
&format!(
"backup tenant mismatch: backup has {}, requested {}",
backup.tenant_id, tid
),
));
}
state.audit_record(
AuditEvent::AdminAction,
Some(TenantId::new(tid)),
&identity.username,
&format!(
"restored tenant {tid} from '{path}' ({} bytes, {} docs, {} indexes)",
bytes.len(),
backup.documents.len(),
backup.indexes.len()
),
);
Ok(vec![Response::Execution(Tag::new("RESTORE TENANT"))])
}
pub fn restore_tenant_dry_run(
_state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: only superuser can validate restores",
));
}
if parts.len() < 5 {
return Err(sqlstate_error(
"42601",
"syntax: RESTORE TENANT <id> FROM '<path>' DRY RUN",
));
}
let path = parts[4].trim_matches('\'').trim_matches('"');
let file_path = std::path::Path::new(path);
if !file_path.exists() {
return Ok(vec![Response::Execution(Tag::new(&format!(
"DRY RUN FAILED: backup file '{}' does not exist",
path
)))]);
}
let metadata = std::fs::metadata(file_path)
.map_err(|e| sqlstate_error("XX000", &format!("cannot read backup file: {e}")))?;
let size_mb = metadata.len() as f64 / (1024.0 * 1024.0);
let data = std::fs::read(file_path)
.map_err(|e| sqlstate_error("XX000", &format!("cannot read backup file: {e}")))?;
let valid = rmp_serde::from_slice::<TenantBackup>(&data).is_ok();
let status = if valid {
format!(
"DRY RUN OK: backup file '{}' is valid ({:.2} MB). Ready for restore.",
path, size_mb
)
} else {
format!(
"DRY RUN FAILED: backup file '{}' ({:.2} MB) has invalid format or is corrupted.",
path, size_mb
)
};
Ok(vec![Response::Execution(Tag::new(&status))])
}