use super::action_catalog::{lookup, ActionCategory};
use super::Role;
pub const ENFORCEMENT_MODE_CONFIG_KEY: &str = "red.config.policy.enforcement_mode";
pub const POLICY_ONLY_HARD_VERSION: &str = "1.0.0";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PolicyEnforcementMode {
LegacyRbac,
PolicyOnly,
}
impl PolicyEnforcementMode {
pub fn as_str(self) -> &'static str {
match self {
Self::LegacyRbac => "legacy_rbac",
Self::PolicyOnly => "policy_only",
}
}
pub fn parse(value: &str) -> Option<Self> {
match value {
"legacy_rbac" => Some(Self::LegacyRbac),
"policy_only" => Some(Self::PolicyOnly),
_ => None,
}
}
pub const fn default_fresh_bootstrap() -> Self {
Self::PolicyOnly
}
pub const fn default_existing_install() -> Self {
Self::LegacyRbac
}
}
impl std::fmt::Display for PolicyEnforcementMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
pub fn legacy_rbac_decision(role: Role, action: &str) -> bool {
let required = required_role_for_action(action);
role >= required
}
fn required_role_for_action(action: &str) -> Role {
if action == "select"
|| action == "queue:peek"
|| action == "queue:presence:read"
|| action == "vector:read"
|| action == "vector:search"
|| action == "vector:artifact:read"
{
return Role::Read;
}
match lookup(action) {
Some(entry) => match entry.category {
ActionCategory::Dml => Role::Write,
ActionCategory::Schema => Role::Write,
ActionCategory::Ai => Role::Read,
ActionCategory::Notification => Role::Write,
ActionCategory::Stream => Role::Write,
ActionCategory::Queue => Role::Write,
ActionCategory::Graph => Role::Read,
ActionCategory::Ops => Role::Read,
ActionCategory::Vector => Role::Write,
ActionCategory::Ddl
| ActionCategory::Function
| ActionCategory::Mgmt
| ActionCategory::Policy
| ActionCategory::Admin
| ActionCategory::Config
| ActionCategory::Vault
| ActionCategory::Wildcard
| ActionCategory::Other => Role::Admin,
},
None => Role::Admin,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_accepts_both_modes() {
assert_eq!(
PolicyEnforcementMode::parse("legacy_rbac"),
Some(PolicyEnforcementMode::LegacyRbac)
);
assert_eq!(
PolicyEnforcementMode::parse("policy_only"),
Some(PolicyEnforcementMode::PolicyOnly)
);
}
#[test]
fn parse_rejects_invalid_values() {
for bad in &[
"",
"rbac",
"LEGACY_RBAC",
"policy-only",
"off",
" policy_only",
] {
assert!(
PolicyEnforcementMode::parse(bad).is_none(),
"parse should reject {bad:?}"
);
}
}
#[test]
fn defaults_documented_for_fresh_vs_existing() {
assert_eq!(
PolicyEnforcementMode::default_fresh_bootstrap(),
PolicyEnforcementMode::PolicyOnly
);
assert_eq!(
PolicyEnforcementMode::default_existing_install(),
PolicyEnforcementMode::LegacyRbac
);
}
#[test]
fn display_round_trip() {
for m in &[
PolicyEnforcementMode::LegacyRbac,
PolicyEnforcementMode::PolicyOnly,
] {
let s = m.to_string();
assert_eq!(PolicyEnforcementMode::parse(&s), Some(*m));
}
}
#[test]
fn legacy_rbac_select_requires_only_read() {
assert!(legacy_rbac_decision(Role::Read, "select"));
assert!(legacy_rbac_decision(Role::Write, "select"));
assert!(legacy_rbac_decision(Role::Admin, "select"));
}
#[test]
fn legacy_rbac_dml_write_requires_write() {
for action in &["insert", "update", "delete", "truncate", "write"] {
assert!(
!legacy_rbac_decision(Role::Read, action),
"Read must not satisfy {action}",
);
assert!(
legacy_rbac_decision(Role::Write, action),
"Write must satisfy {action}",
);
assert!(
legacy_rbac_decision(Role::Admin, action),
"Admin must satisfy {action}",
);
}
}
#[test]
fn legacy_rbac_admin_categories_require_admin() {
for action in &[
"create",
"drop",
"alter",
"grant",
"revoke",
"policy:put",
"admin:bootstrap",
"config:write",
"vault:read",
"*",
] {
assert!(
!legacy_rbac_decision(Role::Read, action),
"Read must not satisfy {action}",
);
assert!(
!legacy_rbac_decision(Role::Write, action),
"Write must not satisfy {action}",
);
assert!(
legacy_rbac_decision(Role::Admin, action),
"Admin must satisfy {action}",
);
}
}
#[test]
fn legacy_rbac_unknown_action_requires_admin() {
assert!(!legacy_rbac_decision(Role::Read, "made-up:verb"));
assert!(!legacy_rbac_decision(Role::Write, "made-up:verb"));
assert!(legacy_rbac_decision(Role::Admin, "made-up:verb"));
}
#[test]
fn hard_version_constant_is_well_formed() {
let v = POLICY_ONLY_HARD_VERSION;
assert!(v.contains('.'), "hard version must look like x.y[.z]");
for ch in v.chars() {
assert!(
ch.is_ascii_digit() || ch == '.',
"hard version must contain only digits and dots, got {ch:?}"
);
}
}
}