use nodedb_types::DatabaseId;
use std::collections::HashSet;
use crate::control::security::catalog::SystemCatalog;
use crate::control::security::catalog::auth_types::object_type;
use super::divergence::{Divergence, DivergenceKind};
pub fn verify_redb_integrity(catalog: &SystemCatalog) -> Vec<Divergence> {
let mut violations: Vec<Divergence> = Vec::new();
macro_rules! load_table {
($table:literal, $expr:expr) => {
match $expr {
Ok(v) => v,
Err(e) => {
violations.push(Divergence::new(DivergenceKind::TableLoadError {
table: $table,
detail: e.to_string(),
}));
return violations;
}
}
};
}
let collections = load_table!(
"collections",
catalog.load_all_collections(DatabaseId::DEFAULT)
);
let owners = load_table!("owners", catalog.load_all_owners());
let users = load_table!("users", catalog.load_all_users());
let roles = load_table!("roles", catalog.load_all_roles());
let permissions = load_table!("permissions", catalog.load_all_permissions());
let triggers = load_table!("triggers", catalog.load_all_triggers());
let functions = load_table!("functions", catalog.load_all_functions());
let procedures = load_table!("procedures", catalog.load_all_procedures());
let materialized_views =
load_table!("materialized_views", catalog.load_all_materialized_views());
let sequences = load_table!("sequences", catalog.load_all_sequences());
let schedules = load_table!("schedules", catalog.load_all_schedules());
let change_streams = load_table!("change_streams", catalog.load_all_change_streams());
let continuous_aggregates = load_table!(
"continuous_aggregates",
catalog.load_all_continuous_aggregates()
);
let rls = load_table!("rls_policies", catalog.load_all_rls_policies());
let collection_keys: HashSet<(u64, String)> = collections
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect();
let user_names: HashSet<String> = users.iter().map(|u| u.username.clone()).collect();
let role_names: HashSet<String> = roles.iter().map(|r| r.name.clone()).collect();
let owner_keys: HashSet<(String, u64, String)> = owners
.iter()
.map(|o| (o.object_type.clone(), o.tenant_id, o.object_name.clone()))
.collect();
let parent_replicated: [(&'static str, Vec<(u64, String)>); 9] = [
(
object_type::COLLECTION,
collections
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
(
object_type::FUNCTION,
functions
.iter()
.map(|f| (f.tenant_id, f.name.clone()))
.collect(),
),
(
object_type::PROCEDURE,
procedures
.iter()
.map(|p| (p.tenant_id, p.name.clone()))
.collect(),
),
(
object_type::TRIGGER,
triggers
.iter()
.map(|t| (t.tenant_id, t.name.clone()))
.collect(),
),
(
object_type::MATERIALIZED_VIEW,
materialized_views
.iter()
.map(|m| (m.tenant_id, m.name.clone()))
.collect(),
),
(
object_type::SEQUENCE,
sequences
.iter()
.map(|s| (s.tenant_id, s.name.clone()))
.collect(),
),
(
object_type::SCHEDULE,
schedules
.iter()
.map(|s| (s.tenant_id, s.name.clone()))
.collect(),
),
(
object_type::CHANGE_STREAM,
change_streams
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
(
object_type::CONTINUOUS_AGGREGATE,
continuous_aggregates
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
];
for (kind, rows) in &parent_replicated {
for (tenant, name) in rows {
let key = ((*kind).to_string(), *tenant, name.clone());
if !owner_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::OrphanRow {
kind,
key: format!("{tenant}:{name}"),
expected_parent_kind: "owner",
}));
}
}
}
for o in &owners {
if !user_names.contains(&o.owner_username) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "owner",
from_key: format!("{}:{}:{}", o.object_type, o.tenant_id, o.object_name),
to_kind: "user",
to_key: o.owner_username.clone(),
}));
}
}
for p in &permissions {
if let Some(username) = p.grantee.strip_prefix("user:") {
if !user_names.contains(username) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "permission",
from_key: format!("{}:{}", p.target, p.grantee),
to_kind: "user",
to_key: username.to_string(),
}));
}
} else {
if !role_names.contains(&p.grantee) && !is_builtin_role(&p.grantee) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "permission",
from_key: format!("{}:{}", p.target, p.grantee),
to_kind: "role",
to_key: p.grantee.clone(),
}));
}
}
}
for t in &triggers {
let key = (t.tenant_id, t.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "trigger",
from_key: format!("{}:{}", t.tenant_id, t.name),
to_kind: "collection",
to_key: format!("{}:{}", t.tenant_id, t.collection),
}));
}
}
for p in &rls {
let key = (p.tenant_id, p.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "rls_policy",
from_key: format!("{}:{}", p.tenant_id, p.name),
to_kind: "collection",
to_key: format!("{}:{}", p.tenant_id, p.collection),
}));
}
}
for mv in &materialized_views {
let key = (mv.tenant_id, mv.source.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "materialized_view",
from_key: format!("{}:{}", mv.tenant_id, mv.name),
to_kind: "collection",
to_key: format!("{}:{}", mv.tenant_id, mv.source),
}));
}
}
for cs in &change_streams {
if cs.collection == "*" {
continue;
}
let key = (cs.tenant_id, cs.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "change_stream",
from_key: format!("{}:{}", cs.tenant_id, cs.name),
to_kind: "collection",
to_key: format!("{}:{}", cs.tenant_id, cs.collection),
}));
}
}
for sch in &schedules {
let Some(target) = &sch.target_collection else {
continue;
};
let key = (sch.tenant_id, target.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "schedule",
from_key: format!("{}:{}", sch.tenant_id, sch.name),
to_kind: "collection",
to_key: format!("{}:{}", sch.tenant_id, target),
}));
}
}
let _ = (functions, procedures, sequences);
violations
}
fn is_builtin_role(name: &str) -> bool {
matches!(
name,
"superuser" | "tenant_admin" | "readwrite" | "readonly" | "monitor"
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builtin_role_detection() {
assert!(is_builtin_role("superuser"));
assert!(is_builtin_role("readonly"));
assert!(is_builtin_role("monitor"));
assert!(!is_builtin_role("admin"));
assert!(!is_builtin_role("custom_auditor"));
}
}