use nodedb_types::DatabaseId;
use std::collections::HashSet;
use crate::control::security::catalog::SystemCatalog;
use crate::control::security::catalog::auth_types::object_type;
use super::divergence::{Divergence, DivergenceKind};
pub fn verify_redb_integrity(catalog: &SystemCatalog) -> Vec<Divergence> {
let mut violations: Vec<Divergence> = Vec::new();
let collections = match catalog.load_all_collections(DatabaseId::DEFAULT) {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load collections");
return violations;
}
};
let owners = match catalog.load_all_owners() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load owners");
Vec::new()
}
};
let users = match catalog.load_all_users() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load users");
Vec::new()
}
};
let roles = match catalog.load_all_roles() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load roles");
Vec::new()
}
};
let permissions = match catalog.load_all_permissions() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load permissions");
Vec::new()
}
};
let triggers = match catalog.load_all_triggers() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load triggers");
Vec::new()
}
};
let functions = match catalog.load_all_functions() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load functions");
Vec::new()
}
};
let procedures = match catalog.load_all_procedures() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load procedures");
Vec::new()
}
};
let materialized_views = match catalog.load_all_materialized_views() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load materialized_views");
Vec::new()
}
};
let sequences = match catalog.load_all_sequences() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load sequences");
Vec::new()
}
};
let schedules = match catalog.load_all_schedules() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load schedules");
Vec::new()
}
};
let change_streams = match catalog.load_all_change_streams() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load change_streams");
Vec::new()
}
};
let continuous_aggregates = match catalog.load_all_continuous_aggregates() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load continuous_aggregates");
Vec::new()
}
};
let rls = match catalog.load_all_rls_policies() {
Ok(v) => v,
Err(e) => {
tracing::error!(error = %e, "integrity: failed to load rls policies");
Vec::new()
}
};
let collection_keys: HashSet<(u64, String)> = collections
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect();
let user_names: HashSet<String> = users.iter().map(|u| u.username.clone()).collect();
let role_names: HashSet<String> = roles.iter().map(|r| r.name.clone()).collect();
let owner_keys: HashSet<(String, u64, String)> = owners
.iter()
.map(|o| (o.object_type.clone(), o.tenant_id, o.object_name.clone()))
.collect();
let parent_replicated: [(&'static str, Vec<(u64, String)>); 9] = [
(
object_type::COLLECTION,
collections
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
(
object_type::FUNCTION,
functions
.iter()
.map(|f| (f.tenant_id, f.name.clone()))
.collect(),
),
(
object_type::PROCEDURE,
procedures
.iter()
.map(|p| (p.tenant_id, p.name.clone()))
.collect(),
),
(
object_type::TRIGGER,
triggers
.iter()
.map(|t| (t.tenant_id, t.name.clone()))
.collect(),
),
(
object_type::MATERIALIZED_VIEW,
materialized_views
.iter()
.map(|m| (m.tenant_id, m.name.clone()))
.collect(),
),
(
object_type::SEQUENCE,
sequences
.iter()
.map(|s| (s.tenant_id, s.name.clone()))
.collect(),
),
(
object_type::SCHEDULE,
schedules
.iter()
.map(|s| (s.tenant_id, s.name.clone()))
.collect(),
),
(
object_type::CHANGE_STREAM,
change_streams
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
(
object_type::CONTINUOUS_AGGREGATE,
continuous_aggregates
.iter()
.map(|c| (c.tenant_id, c.name.clone()))
.collect(),
),
];
for (kind, rows) in &parent_replicated {
for (tenant, name) in rows {
let key = ((*kind).to_string(), *tenant, name.clone());
if !owner_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::OrphanRow {
kind,
key: format!("{tenant}:{name}"),
expected_parent_kind: "owner",
}));
}
}
}
for o in &owners {
if !user_names.contains(&o.owner_username) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "owner",
from_key: format!("{}:{}:{}", o.object_type, o.tenant_id, o.object_name),
to_kind: "user",
to_key: o.owner_username.clone(),
}));
}
}
for p in &permissions {
if let Some(username) = p.grantee.strip_prefix("user:") {
if !user_names.contains(username) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "permission",
from_key: format!("{}:{}", p.target, p.grantee),
to_kind: "user",
to_key: username.to_string(),
}));
}
} else {
if !role_names.contains(&p.grantee) && !is_builtin_role(&p.grantee) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "permission",
from_key: format!("{}:{}", p.target, p.grantee),
to_kind: "role",
to_key: p.grantee.clone(),
}));
}
}
}
for t in &triggers {
let key = (t.tenant_id, t.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "trigger",
from_key: format!("{}:{}", t.tenant_id, t.name),
to_kind: "collection",
to_key: format!("{}:{}", t.tenant_id, t.collection),
}));
}
}
for p in &rls {
let key = (p.tenant_id, p.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "rls_policy",
from_key: format!("{}:{}", p.tenant_id, p.name),
to_kind: "collection",
to_key: format!("{}:{}", p.tenant_id, p.collection),
}));
}
}
for mv in &materialized_views {
let key = (mv.tenant_id, mv.source.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "materialized_view",
from_key: format!("{}:{}", mv.tenant_id, mv.name),
to_kind: "collection",
to_key: format!("{}:{}", mv.tenant_id, mv.source),
}));
}
}
for cs in &change_streams {
if cs.collection == "*" {
continue;
}
let key = (cs.tenant_id, cs.collection.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "change_stream",
from_key: format!("{}:{}", cs.tenant_id, cs.name),
to_kind: "collection",
to_key: format!("{}:{}", cs.tenant_id, cs.collection),
}));
}
}
for sch in &schedules {
let Some(target) = &sch.target_collection else {
continue;
};
let key = (sch.tenant_id, target.clone());
if !collection_keys.contains(&key) {
violations.push(Divergence::new(DivergenceKind::DanglingReference {
from_kind: "schedule",
from_key: format!("{}:{}", sch.tenant_id, sch.name),
to_kind: "collection",
to_key: format!("{}:{}", sch.tenant_id, target),
}));
}
}
let _ = (functions, procedures, sequences);
violations
}
fn is_builtin_role(name: &str) -> bool {
matches!(
name,
"superuser" | "tenant_admin" | "readwrite" | "readonly" | "monitor"
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builtin_role_detection() {
assert!(is_builtin_role("superuser"));
assert!(is_builtin_role("readonly"));
assert!(is_builtin_role("monitor"));
assert!(!is_builtin_role("admin"));
assert!(!is_builtin_role("custom_auditor"));
}
}