use nodedb_types::DatabaseId;
use tracing::{info, warn};
use crate::control::security::catalog::SystemCatalog;
use crate::control::security::catalog::auth_types::{StoredOwner, object_type};
use super::divergence::{Divergence, DivergenceKind};
pub fn heal_orphan_rows(
catalog: &SystemCatalog,
violations: Vec<Divergence>,
) -> (Vec<Divergence>, usize) {
let mut remaining = Vec::with_capacity(violations.len());
let mut healed = 0usize;
for d in violations {
if let DivergenceKind::OrphanRow {
kind,
key,
expected_parent_kind: "owner",
} = &d.kind
&& let Some((tenant_id, name)) = parse_key(key)
&& let Some(owner_username) = primary_row_owner(catalog, kind, tenant_id, &name)
{
let stored = StoredOwner {
object_type: (*kind).to_string(),
object_name: name.clone(),
tenant_id,
owner_username: owner_username.clone(),
};
match catalog.put_owner(&stored) {
Ok(()) => {
info!(
kind,
tenant_id,
object = %name,
owner = %owner_username,
"catalog sanity check: healed orphan row by \
reconstructing StoredOwner from primary row's \
in-band owner field"
);
healed += 1;
continue;
}
Err(e) => warn!(
kind,
tenant_id,
object = %name,
error = %e,
"catalog sanity check: could not heal orphan row — \
leaving divergence in integrity_violations"
),
}
}
remaining.push(d);
}
(remaining, healed)
}
fn parse_key(key: &str) -> Option<(u64, String)> {
let (tenant, name) = key.split_once(':')?;
let tenant_id = tenant.parse().ok()?;
Some((tenant_id, name.to_string()))
}
fn primary_row_owner(
catalog: &SystemCatalog,
kind: &str,
tenant_id: u64,
name: &str,
) -> Option<String> {
match kind {
object_type::COLLECTION => catalog
.get_collection(DatabaseId::DEFAULT, tenant_id, name)
.ok()
.flatten()
.map(|c| c.owner),
object_type::FUNCTION => catalog
.get_function(tenant_id, name)
.ok()
.flatten()
.map(|f| f.owner),
object_type::PROCEDURE => catalog
.get_procedure(tenant_id, name)
.ok()
.flatten()
.map(|p| p.owner),
object_type::TRIGGER => catalog
.get_trigger(tenant_id, name)
.ok()
.flatten()
.map(|t| t.owner),
object_type::MATERIALIZED_VIEW => catalog
.get_materialized_view(tenant_id, name)
.ok()
.flatten()
.map(|m| m.owner),
object_type::SEQUENCE => catalog
.get_sequence(tenant_id, name)
.ok()
.flatten()
.map(|s| s.owner),
object_type::SCHEDULE => catalog
.load_all_schedules()
.ok()
.and_then(|all| {
all.into_iter()
.find(|s| s.tenant_id == tenant_id && s.name == name)
})
.map(|s| s.owner),
object_type::CHANGE_STREAM => catalog
.get_change_stream(tenant_id, name)
.ok()
.flatten()
.map(|c| c.owner),
object_type::CONTINUOUS_AGGREGATE => catalog
.get_continuous_aggregate(tenant_id, name)
.ok()
.flatten()
.map(|c| c.owner),
_ => None,
}
}