#[cfg(feature = "cedar")]
mod cedar_impl {
use cedar_policy::{
Authorizer, Context, Decision, Entities, Entity, EntityId, EntityTypeName, EntityUid,
Policy, PolicySet, Request, Schema, ValidationMode,
};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use thiserror::Error;
use tracing::{debug, info};
#[derive(Error, Debug)]
pub enum CedarError {
#[error("Policy parse error: {0}")]
PolicyParse(String),
#[error("Schema error: {0}")]
Schema(String),
#[error("Validation error: {0}")]
Validation(String),
#[error("Entity error: {0}")]
Entity(String),
#[error("Request error: {0}")]
Request(String),
#[error("Authorization denied: {principal} cannot {action} on {resource}")]
Denied {
principal: String,
action: String,
resource: String,
},
#[error("Internal error: {0}")]
Internal(String),
}
pub type CedarResult<T> = Result<T, CedarError>;
fn make_entity_uid(type_name: &str, id: &str) -> CedarResult<EntityUid> {
let tn = EntityTypeName::from_str(type_name).map_err(|e| {
CedarError::Entity(format!("invalid entity type '{}': {}", type_name, e))
})?;
let eid = EntityId::from_str(id)
.map_err(|e| CedarError::Entity(format!("invalid entity id '{}': {}", id, e)))?;
Ok(EntityUid::from_type_name_and_id(tn, eid))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RivvenAction {
Produce,
Consume,
Create,
Delete,
Alter,
Describe,
Join,
Leave,
Commit,
FetchOffsets,
Admin,
AlterConfigs,
DescribeConfigs,
}
impl RivvenAction {
pub fn as_str(&self) -> &'static str {
match self {
Self::Produce => "produce",
Self::Consume => "consume",
Self::Create => "create",
Self::Delete => "delete",
Self::Alter => "alter",
Self::Describe => "describe",
Self::Join => "join",
Self::Leave => "leave",
Self::Commit => "commit",
Self::FetchOffsets => "fetch_offsets",
Self::Admin => "admin",
Self::AlterConfigs => "alter_configs",
Self::DescribeConfigs => "describe_configs",
}
}
fn to_entity_uid(self) -> CedarResult<EntityUid> {
make_entity_uid("Rivven::Action", self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "type", content = "name")]
pub enum RivvenResource {
Topic(String),
ConsumerGroup(String),
Schema(String),
Cluster,
}
impl RivvenResource {
fn type_name(&self) -> &'static str {
match self {
Self::Topic(_) => "Rivven::Topic",
Self::ConsumerGroup(_) => "Rivven::ConsumerGroup",
Self::Schema(_) => "Rivven::Schema",
Self::Cluster => "Rivven::Cluster",
}
}
fn id(&self) -> &str {
match self {
Self::Topic(name) => name,
Self::ConsumerGroup(name) => name,
Self::Schema(name) => name,
Self::Cluster => "default",
}
}
fn to_entity_uid(&self) -> CedarResult<EntityUid> {
make_entity_uid(self.type_name(), self.id())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthzContext {
pub ip_address: Option<String>,
pub timestamp: String,
pub tls_subject: Option<String>,
#[serde(flatten)]
pub extra: HashMap<String, serde_json::Value>,
}
impl Default for AuthzContext {
fn default() -> Self {
Self {
ip_address: None,
timestamp: chrono::Utc::now().to_rfc3339(),
tls_subject: None,
extra: HashMap::new(),
}
}
}
impl AuthzContext {
pub fn new() -> Self {
Self::default()
}
pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
self.ip_address = Some(ip.into());
self
}
pub fn with_tls_subject(mut self, subject: impl Into<String>) -> Self {
self.tls_subject = Some(subject.into());
self
}
pub fn with_attr(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
self.extra.insert(key.into(), value);
self
}
fn to_cedar_context(&self) -> CedarResult<Context> {
let mut context_map = serde_json::Map::new();
context_map.insert("timestamp".to_string(), serde_json::json!(self.timestamp));
if let Some(ip) = &self.ip_address {
context_map.insert("ip_address".to_string(), serde_json::json!(ip));
}
if let Some(tls) = &self.tls_subject {
context_map.insert("tls_subject".to_string(), serde_json::json!(tls));
}
for (key, value) in &self.extra {
if !value.is_null() {
context_map.insert(key.clone(), value.clone());
}
}
let json = serde_json::Value::Object(context_map);
Context::from_json_value(json, None)
.map_err(|e| CedarError::Request(format!("Invalid context: {}", e)))
}
}
#[derive(Debug, Clone)]
pub struct AuthzDecision {
pub allowed: bool,
pub satisfied_policies: Vec<String>,
pub denied_policies: Vec<String>,
pub errors: Vec<String>,
pub diagnostics: Option<String>,
}
impl AuthzDecision {
pub fn allowed(satisfied: Vec<String>) -> Self {
Self {
allowed: true,
satisfied_policies: satisfied,
denied_policies: vec![],
errors: vec![],
diagnostics: None,
}
}
pub fn denied(denied: Vec<String>) -> Self {
Self {
allowed: false,
satisfied_policies: vec![],
denied_policies: denied,
errors: vec![],
diagnostics: None,
}
}
}
pub struct CedarAuthorizer {
schema: Option<Schema>,
policies: RwLock<PolicySet>,
entities: RwLock<Entities>,
validate_policies: bool,
}
impl CedarAuthorizer {
pub fn new() -> CedarResult<Self> {
let schema = Self::default_schema()?;
Ok(Self {
schema: Some(schema),
policies: RwLock::new(PolicySet::new()),
entities: RwLock::new(Entities::empty()),
validate_policies: true,
})
}
pub fn new_without_schema() -> Self {
Self {
schema: None,
policies: RwLock::new(PolicySet::new()),
entities: RwLock::new(Entities::empty()),
validate_policies: false,
}
}
fn default_schema() -> CedarResult<Schema> {
let schema_src = r#"
{
"Rivven": {
"entityTypes": {
"User": {
"memberOfTypes": ["Group"],
"shape": {
"type": "Record",
"attributes": {
"email": { "type": "String", "required": false },
"roles": { "type": "Set", "element": { "type": "String" } },
"service_account": { "type": "Boolean" }
}
}
},
"Group": {
"memberOfTypes": ["Group"]
},
"Topic": {
"shape": {
"type": "Record",
"attributes": {
"owner": { "type": "Entity", "name": "Rivven::User", "required": false },
"partitions": { "type": "Long" },
"replication_factor": { "type": "Long" },
"retention_ms": { "type": "Long" },
"name": { "type": "String" }
}
}
},
"ConsumerGroup": {
"shape": {
"type": "Record",
"attributes": {
"name": { "type": "String" }
}
}
},
"Schema": {
"shape": {
"type": "Record",
"attributes": {
"name": { "type": "String" },
"version": { "type": "Long" }
}
}
},
"Cluster": {}
},
"actions": {
"produce": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic"]
}
},
"consume": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic"]
}
},
"create": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic", "Schema"]
}
},
"delete": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic", "ConsumerGroup", "Schema"]
}
},
"alter": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic", "Schema"]
}
},
"describe": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Topic", "ConsumerGroup", "Schema", "Cluster"]
}
},
"join": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["ConsumerGroup"]
}
},
"leave": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["ConsumerGroup"]
}
},
"commit": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["ConsumerGroup"]
}
},
"fetch_offsets": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["ConsumerGroup"]
}
},
"admin": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Cluster"]
}
},
"alter_configs": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Cluster"]
}
},
"describe_configs": {
"appliesTo": {
"principalTypes": ["User", "Group"],
"resourceTypes": ["Cluster"]
}
}
}
}
}
"#;
Schema::from_json_str(schema_src)
.map_err(|e| CedarError::Schema(format!("Invalid schema: {:?}", e)))
}
pub fn add_policy(&self, id: &str, policy_src: &str) -> CedarResult<()> {
let policy = Policy::parse(Some(cedar_policy::PolicyId::new(id)), policy_src)
.map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
if self.validate_policies {
if let Some(schema) = &self.schema {
let mut temp_set = PolicySet::new();
temp_set.add(policy.clone()).map_err(|e| {
CedarError::PolicyParse(format!("Duplicate policy ID: {:?}", e))
})?;
let validator = cedar_policy::Validator::new(schema.clone());
let result = validator.validate(&temp_set, ValidationMode::Strict);
if !result.validation_passed() {
let errors: Vec<String> = result
.validation_errors()
.map(|e| format!("{:?}", e))
.collect();
return Err(CedarError::Validation(errors.join("; ")));
}
}
}
let mut policies = self.policies.write();
policies
.add(policy)
.map_err(|e| CedarError::PolicyParse(format!("Failed to add policy: {:?}", e)))?;
info!("Added Cedar policy: {}", id);
Ok(())
}
pub fn add_policies(&self, policies_src: &str) -> CedarResult<()> {
let parsed = PolicySet::from_str(policies_src)
.map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
if self.validate_policies {
if let Some(schema) = &self.schema {
let validator = cedar_policy::Validator::new(schema.clone());
let result = validator.validate(&parsed, ValidationMode::Strict);
if !result.validation_passed() {
let errors: Vec<String> = result
.validation_errors()
.map(|e| format!("{:?}", e))
.collect();
return Err(CedarError::Validation(errors.join("; ")));
}
}
}
let mut policies = self.policies.write();
for policy in parsed.policies() {
policies.add(policy.clone()).map_err(|e| {
CedarError::PolicyParse(format!("Failed to add policy: {:?}", e))
})?;
}
Ok(())
}
pub fn remove_policy(&self, id: &str) -> CedarResult<()> {
let mut policies = self.policies.write();
let policy_id = cedar_policy::PolicyId::new(id);
let new_policies = policies
.policies()
.filter(|p| p.id() != &policy_id)
.cloned()
.collect::<Vec<_>>();
let mut new_set = PolicySet::new();
for policy in new_policies {
new_set.add(policy).ok();
}
*policies = new_set;
info!("Removed Cedar policy: {}", id);
Ok(())
}
fn upsert_entity(&self, entity: Entity) -> CedarResult<()> {
let mut guard = self.entities.write();
let current = std::mem::replace(&mut *guard, Entities::empty());
match current.upsert_entities([entity], None) {
Ok(updated) => {
*guard = updated;
Ok(())
}
Err(e) => Err(CedarError::Entity(format!(
"Failed to upsert entity: {:?}",
e
))),
}
}
pub fn add_user(
&self,
username: &str,
email: Option<&str>,
roles: &[&str],
groups: &[&str],
is_service_account: bool,
) -> CedarResult<()> {
let uid = make_entity_uid("Rivven::User", username)?;
let parents: HashSet<EntityUid> = groups
.iter()
.map(|g| make_entity_uid("Rivven::Group", g))
.collect::<CedarResult<_>>()?;
let mut attrs = HashMap::new();
if let Some(e) = email {
attrs.insert(
"email".to_string(),
cedar_policy::RestrictedExpression::new_string(e.to_string()),
);
}
let roles_set: Vec<_> = roles
.iter()
.map(|r| cedar_policy::RestrictedExpression::new_string(r.to_string()))
.collect();
attrs.insert(
"roles".to_string(),
cedar_policy::RestrictedExpression::new_set(roles_set),
);
attrs.insert(
"service_account".to_string(),
cedar_policy::RestrictedExpression::new_bool(is_service_account),
);
let entity = Entity::new(uid, attrs, parents)
.map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
self.upsert_entity(entity)?;
debug!("Added user entity: {}", username);
Ok(())
}
pub fn add_group(&self, name: &str, parent_groups: &[&str]) -> CedarResult<()> {
let uid = make_entity_uid("Rivven::Group", name)?;
let parents: HashSet<EntityUid> = parent_groups
.iter()
.map(|g| make_entity_uid("Rivven::Group", g))
.collect::<CedarResult<_>>()?;
let entity = Entity::new_no_attrs(uid, parents);
self.upsert_entity(entity)?;
debug!("Added group entity: {}", name);
Ok(())
}
pub fn add_topic(
&self,
name: &str,
owner: Option<&str>,
partitions: i64,
replication_factor: i64,
retention_ms: i64,
) -> CedarResult<()> {
let uid = make_entity_uid("Rivven::Topic", name)?;
let mut attrs = HashMap::new();
attrs.insert(
"name".to_string(),
cedar_policy::RestrictedExpression::new_string(name.to_string()),
);
if let Some(o) = owner {
let owner_uid = make_entity_uid("Rivven::User", o)?;
attrs.insert(
"owner".to_string(),
cedar_policy::RestrictedExpression::new_entity_uid(owner_uid),
);
}
attrs.insert(
"partitions".to_string(),
cedar_policy::RestrictedExpression::new_long(partitions),
);
attrs.insert(
"replication_factor".to_string(),
cedar_policy::RestrictedExpression::new_long(replication_factor),
);
attrs.insert(
"retention_ms".to_string(),
cedar_policy::RestrictedExpression::new_long(retention_ms),
);
let entity = Entity::new(uid, attrs, HashSet::new())
.map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
self.upsert_entity(entity)?;
debug!("Added topic entity: {}", name);
Ok(())
}
pub fn add_consumer_group(&self, name: &str) -> CedarResult<()> {
let uid = make_entity_uid("Rivven::ConsumerGroup", name)?;
let mut attrs = HashMap::new();
attrs.insert(
"name".to_string(),
cedar_policy::RestrictedExpression::new_string(name.to_string()),
);
let entity = Entity::new(uid, attrs, HashSet::new())
.map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
self.upsert_entity(entity)?;
debug!("Added consumer group entity: {}", name);
Ok(())
}
pub fn add_schema(&self, name: &str, version: i64) -> CedarResult<()> {
let uid = make_entity_uid("Rivven::Schema", name)?;
let mut attrs = HashMap::new();
attrs.insert(
"name".to_string(),
cedar_policy::RestrictedExpression::new_string(name.to_string()),
);
attrs.insert(
"version".to_string(),
cedar_policy::RestrictedExpression::new_long(version),
);
let entity = Entity::new(uid, attrs, HashSet::new())
.map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
self.upsert_entity(entity)?;
debug!("Added schema entity: {} (version {})", name, version);
Ok(())
}
pub fn is_authorized(
&self,
principal: &str,
action: RivvenAction,
resource: &RivvenResource,
context: &AuthzContext,
) -> CedarResult<AuthzDecision> {
let principal_uid = make_entity_uid("Rivven::User", principal)?;
let action_uid = action.to_entity_uid()?;
let resource_uid = resource.to_entity_uid()?;
let cedar_context = context.to_cedar_context()?;
let request = Request::new(
principal_uid.clone(),
action_uid.clone(),
resource_uid.clone(),
cedar_context,
None,
)
.map_err(|e| CedarError::Request(format!("Invalid request: {:?}", e)))?;
let authorizer = Authorizer::new();
let policies = self.policies.read();
let entities = self.entities.read();
let response = authorizer.is_authorized(&request, &policies, &entities);
let decision = match response.decision() {
Decision::Allow => {
let satisfied: Vec<String> = response
.diagnostics()
.reason()
.map(|id| id.to_string())
.collect();
AuthzDecision::allowed(satisfied)
}
Decision::Deny => {
let denied: Vec<String> = response
.diagnostics()
.reason()
.map(|id| id.to_string())
.collect();
let errors: Vec<String> = response
.diagnostics()
.errors()
.map(|e| format!("{:?}", e))
.collect();
AuthzDecision {
allowed: false,
satisfied_policies: vec![],
denied_policies: denied,
errors,
diagnostics: None,
}
}
};
debug!(
"Authorization: {} {} {} -> {}",
principal,
action.as_str(),
resource.id(),
if decision.allowed { "ALLOW" } else { "DENY" }
);
Ok(decision)
}
pub fn authorize(
&self,
principal: &str,
action: RivvenAction,
resource: &RivvenResource,
context: &AuthzContext,
) -> CedarResult<()> {
let decision = self.is_authorized(principal, action, resource, context)?;
if decision.allowed {
Ok(())
} else {
Err(CedarError::Denied {
principal: principal.to_string(),
action: action.as_str().to_string(),
resource: format!("{:?}", resource),
})
}
}
pub fn load_default_policies(&self) -> CedarResult<()> {
self.add_policy(
"super-admin",
r#"
permit(
principal in Rivven::Group::"admins",
action,
resource
);
"#,
)?;
self.add_policy(
"describe-topics",
r#"
permit(
principal,
action == Rivven::Action::"describe",
resource is Rivven::Topic
);
"#,
)?;
self.add_policy(
"describe-consumer-groups",
r#"
permit(
principal,
action == Rivven::Action::"describe",
resource is Rivven::ConsumerGroup
);
"#,
)?;
info!("Loaded default Cedar policies");
Ok(())
}
}
impl Default for CedarAuthorizer {
fn default() -> Self {
match Self::new() {
Ok(authorizer) => authorizer,
Err(e) => {
tracing::error!(
"Failed to create default Cedar authorizer: {}. Using deny-all fallback.",
e
);
Self::new_without_schema()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_authorizer() {
let authz = CedarAuthorizer::new().unwrap();
assert!(authz.schema.is_some());
}
#[test]
fn test_add_simple_policy() {
let authz = CedarAuthorizer::new_without_schema();
authz
.add_policy(
"test-policy",
r#"
permit(
principal == Rivven::User::"alice",
action == Rivven::Action::"produce",
resource == Rivven::Topic::"orders"
);
"#,
)
.unwrap();
}
#[test]
fn test_add_user_and_authorize() {
let authz = CedarAuthorizer::new_without_schema();
authz
.add_policy(
"admin-all",
r#"
permit(
principal in Rivven::Group::"admins",
action,
resource
);
"#,
)
.unwrap();
authz.add_group("admins", &[]).unwrap();
authz
.add_user(
"alice",
Some("alice@example.com"),
&["admin"],
&["admins"],
false,
)
.unwrap();
authz
.add_topic("orders", Some("alice"), 3, 2, 604800000)
.unwrap();
let ctx = AuthzContext::new().with_ip("127.0.0.1");
let decision = authz
.is_authorized(
"alice",
RivvenAction::Produce,
&RivvenResource::Topic("orders".to_string()),
&ctx,
)
.unwrap();
assert!(decision.allowed);
}
#[test]
fn test_deny_unauthorized() {
let authz = CedarAuthorizer::new_without_schema();
authz
.add_policy(
"only-admins-produce",
r#"
permit(
principal in Rivven::Group::"admins",
action == Rivven::Action::"produce",
resource is Rivven::Topic
);
"#,
)
.unwrap();
authz
.add_user("bob", Some("bob@example.com"), &["user"], &[], false)
.unwrap();
authz.add_topic("orders", None, 3, 2, 604800000).unwrap();
let ctx = AuthzContext::new();
let decision = authz
.is_authorized(
"bob",
RivvenAction::Produce,
&RivvenResource::Topic("orders".to_string()),
&ctx,
)
.unwrap();
assert!(!decision.allowed);
}
#[test]
fn test_context_attributes() {
let ctx = AuthzContext::new()
.with_ip("192.168.1.100")
.with_tls_subject("CN=client,O=Rivven")
.with_attr("custom_field", serde_json::json!("custom_value"));
assert_eq!(ctx.ip_address, Some("192.168.1.100".to_string()));
assert_eq!(ctx.tls_subject, Some("CN=client,O=Rivven".to_string()));
assert!(ctx.extra.contains_key("custom_field"));
}
}
}
#[cfg(feature = "cedar")]
pub use cedar_impl::*;
#[cfg(not(feature = "cedar"))]
mod no_cedar {
use std::collections::HashMap;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CedarError {
#[error("Cedar authorization not enabled. Build with 'cedar' feature.")]
NotEnabled,
}
pub type CedarResult<T> = Result<T, CedarError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RivvenAction {
Produce,
Consume,
Create,
Delete,
Alter,
Describe,
Join,
Leave,
Commit,
FetchOffsets,
Admin,
AlterConfigs,
DescribeConfigs,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RivvenResource {
Topic(String),
ConsumerGroup(String),
Schema(String),
Cluster,
}
#[derive(Debug, Clone, Default)]
pub struct AuthzContext {
pub ip_address: Option<String>,
pub timestamp: String,
pub tls_subject: Option<String>,
pub extra: HashMap<String, serde_json::Value>,
}
impl AuthzContext {
pub fn new() -> Self {
Self::default()
}
pub fn with_ip(self, _ip: impl Into<String>) -> Self {
self
}
pub fn with_tls_subject(self, _subject: impl Into<String>) -> Self {
self
}
}
pub struct CedarAuthorizer;
impl CedarAuthorizer {
pub fn new() -> CedarResult<Self> {
Err(CedarError::NotEnabled)
}
pub fn authorize(
&self,
_principal: &str,
_action: RivvenAction,
_resource: &RivvenResource,
_context: &AuthzContext,
) -> CedarResult<()> {
Err(CedarError::NotEnabled)
}
}
}
#[cfg(not(feature = "cedar"))]
pub use no_cedar::*;