Skip to main content

rivven_core/
cedar_authz.rs

1//! Cedar-based Authorization Engine for Rivven
2//!
3//! This module provides fine-grained, policy-as-code authorization using
4//! [Cedar](https://www.cedarpolicy.com/), the policy language from AWS.
5//!
6//! ## Why Cedar?
7//!
8//! - **Formal verification**: Policies can be mathematically proven correct
9//! - **Separation of concerns**: Policy changes without code changes
10//! - **Audit trail**: Every decision is explainable
11//! - **Attribute-based**: Context-aware decisions (time, IP, resource attributes)
12//!
13//! ## Rivven Entity Model
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────────────┐
17//! │                     RIVVEN CEDAR SCHEMA                             │
18//! ├─────────────────────────────────────────────────────────────────────┤
19//! │                                                                     │
20//! │  namespace Rivven {                                                 │
21//! │    entity User in [Group] {                                         │
22//! │      email?: String,                                               │
23//! │      roles: Set<String>,                                           │
24//! │      service_account: Bool,                                        │
25//! │    };                                                              │
26//! │                                                                     │
27//! │    entity Group in [Group];                                        │
28//! │                                                                     │
29//! │    entity Topic {                                                  │
30//! │      owner?: User,                                                 │
31//! │      partitions: Long,                                             │
32//! │      replication_factor: Long,                                     │
33//! │      retention_ms: Long,                                           │
34//! │    };                                                              │
35//! │                                                                     │
36//! │    entity ConsumerGroup {                                          │
37//! │      members: Set<User>,                                           │
38//! │    };                                                              │
39//! │                                                                     │
40//! │    entity Cluster;                                                 │
41//! │                                                                     │
42//! │    action produce, consume, create, delete, alter, describe        │
43//! │      appliesTo { principal: [User, Group], resource: [Topic] };    │
44//! │                                                                     │
45//! │    action join, leave, commit, fetch_offsets                       │
46//! │      appliesTo { principal: [User, Group], resource: [ConsumerGroup] };│
47//! │                                                                     │
48//! │    action admin                                                     │
49//! │      appliesTo { principal: [User, Group], resource: [Cluster] };  │
50//! │  }                                                                  │
51//! └─────────────────────────────────────────────────────────────────────┘
52//! ```
53//!
54//! ## Example Policies
55//!
56//! ```cedar
57//! // Allow users to produce to topics they own
58//! permit(
59//!   principal,
60//!   action == Rivven::Action::"produce",
61//!   resource
62//! ) when {
63//!   resource.owner == principal
64//! };
65//!
66//! // Allow members of "producers" group to produce to any topic starting with "events-"
67//! permit(
68//!   principal in Rivven::Group::"producers",
69//!   action == Rivven::Action::"produce",
70//!   resource
71//! ) when {
72//!   resource.name.startsWith("events-")
73//! };
74//!
75//! // Deny access from untrusted IPs
76//! forbid(
77//!   principal,
78//!   action,
79//!   resource
80//! ) when {
81//!   context.ip_address.isLoopback() == false &&
82//!   context.ip_address.isInRange(ip("10.0.0.0/8")) == false
83//! };
84//! ```
85
86#[cfg(feature = "cedar")]
87mod cedar_impl {
88    use cedar_policy::{
89        Authorizer, Context, Decision, Entities, Entity, EntityId, EntityTypeName, EntityUid,
90        Policy, PolicySet, Request, Schema, ValidationMode,
91    };
92    use parking_lot::RwLock;
93    use serde::{Deserialize, Serialize};
94    use std::collections::{HashMap, HashSet};
95    use std::str::FromStr;
96    use thiserror::Error;
97    use tracing::{debug, info};
98
99    // ============================================================================
100    // Error Types
101    // ============================================================================
102
103    #[derive(Error, Debug)]
104    pub enum CedarError {
105        #[error("Policy parse error: {0}")]
106        PolicyParse(String),
107
108        #[error("Schema error: {0}")]
109        Schema(String),
110
111        #[error("Validation error: {0}")]
112        Validation(String),
113
114        #[error("Entity error: {0}")]
115        Entity(String),
116
117        #[error("Request error: {0}")]
118        Request(String),
119
120        #[error("Authorization denied: {principal} cannot {action} on {resource}")]
121        Denied {
122            principal: String,
123            action: String,
124            resource: String,
125        },
126
127        #[error("Internal error: {0}")]
128        Internal(String),
129    }
130
131    pub type CedarResult<T> = Result<T, CedarError>;
132
133    // ============================================================================
134    // Rivven Actions
135    // ============================================================================
136
137    /// Actions that can be performed in Rivven
138    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
139    #[serde(rename_all = "snake_case")]
140    pub enum RivvenAction {
141        // Topic actions
142        Produce,
143        Consume,
144        Create,
145        Delete,
146        Alter,
147        Describe,
148
149        // Consumer group actions
150        Join,
151        Leave,
152        Commit,
153        FetchOffsets,
154
155        // Cluster actions
156        Admin,
157        AlterConfigs,
158        DescribeConfigs,
159    }
160
161    impl RivvenAction {
162        pub fn as_str(&self) -> &'static str {
163            match self {
164                Self::Produce => "produce",
165                Self::Consume => "consume",
166                Self::Create => "create",
167                Self::Delete => "delete",
168                Self::Alter => "alter",
169                Self::Describe => "describe",
170                Self::Join => "join",
171                Self::Leave => "leave",
172                Self::Commit => "commit",
173                Self::FetchOffsets => "fetch_offsets",
174                Self::Admin => "admin",
175                Self::AlterConfigs => "alter_configs",
176                Self::DescribeConfigs => "describe_configs",
177            }
178        }
179
180        fn to_entity_uid(self) -> EntityUid {
181            EntityUid::from_type_name_and_id(
182                EntityTypeName::from_str("Rivven::Action").unwrap(),
183                EntityId::from_str(self.as_str()).unwrap(),
184            )
185        }
186    }
187
188    // ============================================================================
189    // Resource Types
190    // ============================================================================
191
192    /// Types of resources in Rivven
193    #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
194    #[serde(tag = "type", content = "name")]
195    pub enum RivvenResource {
196        Topic(String),
197        ConsumerGroup(String),
198        Schema(String),
199        Cluster,
200    }
201
202    impl RivvenResource {
203        fn type_name(&self) -> &'static str {
204            match self {
205                Self::Topic(_) => "Rivven::Topic",
206                Self::ConsumerGroup(_) => "Rivven::ConsumerGroup",
207                Self::Schema(_) => "Rivven::Schema",
208                Self::Cluster => "Rivven::Cluster",
209            }
210        }
211
212        fn id(&self) -> &str {
213            match self {
214                Self::Topic(name) => name,
215                Self::ConsumerGroup(name) => name,
216                Self::Schema(name) => name,
217                Self::Cluster => "default",
218            }
219        }
220
221        fn to_entity_uid(&self) -> EntityUid {
222            EntityUid::from_type_name_and_id(
223                EntityTypeName::from_str(self.type_name()).unwrap(),
224                EntityId::from_str(self.id()).unwrap(),
225            )
226        }
227    }
228
229    // ============================================================================
230    // Authorization Context
231    // ============================================================================
232
233    /// Context for authorization decisions
234    #[derive(Debug, Clone, Serialize, Deserialize)]
235    pub struct AuthzContext {
236        /// Client IP address
237        pub ip_address: Option<String>,
238
239        /// Request timestamp (ISO 8601)
240        pub timestamp: String,
241
242        /// TLS client certificate subject (if mTLS)
243        pub tls_subject: Option<String>,
244
245        /// Additional context attributes
246        #[serde(flatten)]
247        pub extra: HashMap<String, serde_json::Value>,
248    }
249
250    impl Default for AuthzContext {
251        fn default() -> Self {
252            Self {
253                ip_address: None,
254                timestamp: chrono::Utc::now().to_rfc3339(),
255                tls_subject: None,
256                extra: HashMap::new(),
257            }
258        }
259    }
260
261    impl AuthzContext {
262        pub fn new() -> Self {
263            Self::default()
264        }
265
266        pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
267            self.ip_address = Some(ip.into());
268            self
269        }
270
271        pub fn with_tls_subject(mut self, subject: impl Into<String>) -> Self {
272            self.tls_subject = Some(subject.into());
273            self
274        }
275
276        pub fn with_attr(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
277            self.extra.insert(key.into(), value);
278            self
279        }
280
281        fn to_cedar_context(&self) -> CedarResult<Context> {
282            // Cedar does not allow null values, so we must create a JSON object
283            // with only the non-null values
284            let mut context_map = serde_json::Map::new();
285
286            // Add timestamp (always present)
287            context_map.insert("timestamp".to_string(), serde_json::json!(self.timestamp));
288
289            // Add optional fields only if they have values
290            if let Some(ip) = &self.ip_address {
291                context_map.insert("ip_address".to_string(), serde_json::json!(ip));
292            }
293
294            if let Some(tls) = &self.tls_subject {
295                context_map.insert("tls_subject".to_string(), serde_json::json!(tls));
296            }
297
298            // Add extra context, filtering out null values
299            for (key, value) in &self.extra {
300                if !value.is_null() {
301                    context_map.insert(key.clone(), value.clone());
302                }
303            }
304
305            let json = serde_json::Value::Object(context_map);
306
307            Context::from_json_value(json, None)
308                .map_err(|e| CedarError::Request(format!("Invalid context: {}", e)))
309        }
310    }
311
312    // ============================================================================
313    // Authorization Decision
314    // ============================================================================
315
316    /// Result of an authorization check
317    #[derive(Debug, Clone)]
318    pub struct AuthzDecision {
319        /// Whether access is allowed
320        pub allowed: bool,
321
322        /// Policies that permitted the action
323        pub satisfied_policies: Vec<String>,
324
325        /// Policies that denied the action
326        pub denied_policies: Vec<String>,
327
328        /// Errors during evaluation
329        pub errors: Vec<String>,
330
331        /// Diagnostics for debugging
332        pub diagnostics: Option<String>,
333    }
334
335    impl AuthzDecision {
336        pub fn allowed(satisfied: Vec<String>) -> Self {
337            Self {
338                allowed: true,
339                satisfied_policies: satisfied,
340                denied_policies: vec![],
341                errors: vec![],
342                diagnostics: None,
343            }
344        }
345
346        pub fn denied(denied: Vec<String>) -> Self {
347            Self {
348                allowed: false,
349                satisfied_policies: vec![],
350                denied_policies: denied,
351                errors: vec![],
352                diagnostics: None,
353            }
354        }
355    }
356
357    // ============================================================================
358    // Cedar Authorizer
359    // ============================================================================
360
361    /// Cedar-based authorization engine
362    pub struct CedarAuthorizer {
363        /// Cedar schema for validation
364        schema: Option<Schema>,
365
366        /// Policy set
367        policies: RwLock<PolicySet>,
368
369        /// Entity store
370        entities: RwLock<Entities>,
371
372        /// Whether to validate policies against schema
373        validate_policies: bool,
374    }
375
376    impl CedarAuthorizer {
377        /// Create a new authorizer with default schema
378        pub fn new() -> CedarResult<Self> {
379            let schema = Self::default_schema()?;
380
381            Ok(Self {
382                schema: Some(schema),
383                policies: RwLock::new(PolicySet::new()),
384                entities: RwLock::new(Entities::empty()),
385                validate_policies: true,
386            })
387        }
388
389        /// Create without schema validation (for testing)
390        pub fn new_without_schema() -> Self {
391            Self {
392                schema: None,
393                policies: RwLock::new(PolicySet::new()),
394                entities: RwLock::new(Entities::empty()),
395                validate_policies: false,
396            }
397        }
398
399        /// Default Rivven Cedar schema
400        fn default_schema() -> CedarResult<Schema> {
401            let schema_src = r#"
402{
403  "Rivven": {
404    "entityTypes": {
405      "User": {
406        "memberOfTypes": ["Group"],
407        "shape": {
408          "type": "Record",
409          "attributes": {
410            "email": { "type": "String", "required": false },
411            "roles": { "type": "Set", "element": { "type": "String" } },
412            "service_account": { "type": "Boolean" }
413          }
414        }
415      },
416      "Group": {
417        "memberOfTypes": ["Group"]
418      },
419      "Topic": {
420        "shape": {
421          "type": "Record",
422          "attributes": {
423            "owner": { "type": "Entity", "name": "Rivven::User", "required": false },
424            "partitions": { "type": "Long" },
425            "replication_factor": { "type": "Long" },
426            "retention_ms": { "type": "Long" },
427            "name": { "type": "String" }
428          }
429        }
430      },
431      "ConsumerGroup": {
432        "shape": {
433          "type": "Record",
434          "attributes": {
435            "name": { "type": "String" }
436          }
437        }
438      },
439      "Schema": {
440        "shape": {
441          "type": "Record",
442          "attributes": {
443            "name": { "type": "String" },
444            "version": { "type": "Long" }
445          }
446        }
447      },
448      "Cluster": {}
449    },
450    "actions": {
451      "produce": {
452        "appliesTo": {
453          "principalTypes": ["User", "Group"],
454          "resourceTypes": ["Topic"]
455        }
456      },
457      "consume": {
458        "appliesTo": {
459          "principalTypes": ["User", "Group"],
460          "resourceTypes": ["Topic"]
461        }
462      },
463      "create": {
464        "appliesTo": {
465          "principalTypes": ["User", "Group"],
466          "resourceTypes": ["Topic", "Schema"]
467        }
468      },
469      "delete": {
470        "appliesTo": {
471          "principalTypes": ["User", "Group"],
472          "resourceTypes": ["Topic", "ConsumerGroup", "Schema"]
473        }
474      },
475      "alter": {
476        "appliesTo": {
477          "principalTypes": ["User", "Group"],
478          "resourceTypes": ["Topic", "Schema"]
479        }
480      },
481      "describe": {
482        "appliesTo": {
483          "principalTypes": ["User", "Group"],
484          "resourceTypes": ["Topic", "ConsumerGroup", "Schema", "Cluster"]
485        }
486      },
487      "join": {
488        "appliesTo": {
489          "principalTypes": ["User", "Group"],
490          "resourceTypes": ["ConsumerGroup"]
491        }
492      },
493      "leave": {
494        "appliesTo": {
495          "principalTypes": ["User", "Group"],
496          "resourceTypes": ["ConsumerGroup"]
497        }
498      },
499      "commit": {
500        "appliesTo": {
501          "principalTypes": ["User", "Group"],
502          "resourceTypes": ["ConsumerGroup"]
503        }
504      },
505      "fetch_offsets": {
506        "appliesTo": {
507          "principalTypes": ["User", "Group"],
508          "resourceTypes": ["ConsumerGroup"]
509        }
510      },
511      "admin": {
512        "appliesTo": {
513          "principalTypes": ["User", "Group"],
514          "resourceTypes": ["Cluster"]
515        }
516      },
517      "alter_configs": {
518        "appliesTo": {
519          "principalTypes": ["User", "Group"],
520          "resourceTypes": ["Cluster"]
521        }
522      },
523      "describe_configs": {
524        "appliesTo": {
525          "principalTypes": ["User", "Group"],
526          "resourceTypes": ["Cluster"]
527        }
528      }
529    }
530  }
531}
532"#;
533
534            Schema::from_json_str(schema_src)
535                .map_err(|e| CedarError::Schema(format!("Invalid schema: {:?}", e)))
536        }
537
538        /// Add a policy from Cedar source
539        pub fn add_policy(&self, id: &str, policy_src: &str) -> CedarResult<()> {
540            let policy = Policy::parse(Some(cedar_policy::PolicyId::new(id)), policy_src)
541                .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
542
543            // Validate against schema if enabled
544            if self.validate_policies {
545                if let Some(schema) = &self.schema {
546                    let mut temp_set = PolicySet::new();
547                    temp_set.add(policy.clone()).map_err(|e| {
548                        CedarError::PolicyParse(format!("Duplicate policy ID: {:?}", e))
549                    })?;
550
551                    let validator = cedar_policy::Validator::new(schema.clone());
552                    let result = validator.validate(&temp_set, ValidationMode::Strict);
553
554                    if !result.validation_passed() {
555                        let errors: Vec<String> = result
556                            .validation_errors()
557                            .map(|e| format!("{:?}", e))
558                            .collect();
559                        return Err(CedarError::Validation(errors.join("; ")));
560                    }
561                }
562            }
563
564            let mut policies = self.policies.write();
565            policies
566                .add(policy)
567                .map_err(|e| CedarError::PolicyParse(format!("Failed to add policy: {:?}", e)))?;
568
569            info!("Added Cedar policy: {}", id);
570            Ok(())
571        }
572
573        /// Add multiple policies from Cedar source
574        pub fn add_policies(&self, policies_src: &str) -> CedarResult<()> {
575            let parsed = PolicySet::from_str(policies_src)
576                .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
577
578            // Validate against schema if enabled
579            if self.validate_policies {
580                if let Some(schema) = &self.schema {
581                    let validator = cedar_policy::Validator::new(schema.clone());
582                    let result = validator.validate(&parsed, ValidationMode::Strict);
583
584                    if !result.validation_passed() {
585                        let errors: Vec<String> = result
586                            .validation_errors()
587                            .map(|e| format!("{:?}", e))
588                            .collect();
589                        return Err(CedarError::Validation(errors.join("; ")));
590                    }
591                }
592            }
593
594            let mut policies = self.policies.write();
595            for policy in parsed.policies() {
596                policies.add(policy.clone()).map_err(|e| {
597                    CedarError::PolicyParse(format!("Failed to add policy: {:?}", e))
598                })?;
599            }
600
601            Ok(())
602        }
603
604        /// Remove a policy by ID
605        pub fn remove_policy(&self, id: &str) -> CedarResult<()> {
606            // Cedar PolicySet doesn't support removal directly
607            // We need to rebuild without the policy
608            let mut policies = self.policies.write();
609            let policy_id = cedar_policy::PolicyId::new(id);
610
611            // Create a new PolicySet without the specified policy
612            let new_policies = policies
613                .policies()
614                .filter(|p| p.id() != &policy_id)
615                .cloned()
616                .collect::<Vec<_>>();
617
618            let mut new_set = PolicySet::new();
619            for policy in new_policies {
620                new_set.add(policy).ok();
621            }
622
623            *policies = new_set;
624            info!("Removed Cedar policy: {}", id);
625            Ok(())
626        }
627
628        /// Add a user entity
629        pub fn add_user(
630            &self,
631            username: &str,
632            email: Option<&str>,
633            roles: &[&str],
634            groups: &[&str],
635            is_service_account: bool,
636        ) -> CedarResult<()> {
637            let uid = EntityUid::from_type_name_and_id(
638                EntityTypeName::from_str("Rivven::User").unwrap(),
639                EntityId::from_str(username).unwrap(),
640            );
641
642            // Build parent groups
643            let parents: HashSet<EntityUid> = groups
644                .iter()
645                .map(|g| {
646                    EntityUid::from_type_name_and_id(
647                        EntityTypeName::from_str("Rivven::Group").unwrap(),
648                        EntityId::from_str(g).unwrap(),
649                    )
650                })
651                .collect();
652
653            // Build attributes
654            let mut attrs = HashMap::new();
655            if let Some(e) = email {
656                attrs.insert(
657                    "email".to_string(),
658                    cedar_policy::RestrictedExpression::new_string(e.to_string()),
659                );
660            }
661
662            let roles_set: Vec<_> = roles
663                .iter()
664                .map(|r| cedar_policy::RestrictedExpression::new_string(r.to_string()))
665                .collect();
666            attrs.insert(
667                "roles".to_string(),
668                cedar_policy::RestrictedExpression::new_set(roles_set),
669            );
670
671            attrs.insert(
672                "service_account".to_string(),
673                cedar_policy::RestrictedExpression::new_bool(is_service_account),
674            );
675
676            let entity = Entity::new(uid, attrs, parents)
677                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
678
679            let mut entities = self.entities.write();
680            // Convert to new Entities
681            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
682            all_entities.push(entity);
683            *entities = Entities::from_entities(all_entities, None)
684                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
685
686            debug!("Added user entity: {}", username);
687            Ok(())
688        }
689
690        /// Add a group entity
691        pub fn add_group(&self, name: &str, parent_groups: &[&str]) -> CedarResult<()> {
692            let uid = EntityUid::from_type_name_and_id(
693                EntityTypeName::from_str("Rivven::Group").unwrap(),
694                EntityId::from_str(name).unwrap(),
695            );
696
697            let parents: HashSet<EntityUid> = parent_groups
698                .iter()
699                .map(|g| {
700                    EntityUid::from_type_name_and_id(
701                        EntityTypeName::from_str("Rivven::Group").unwrap(),
702                        EntityId::from_str(g).unwrap(),
703                    )
704                })
705                .collect();
706
707            let entity = Entity::new_no_attrs(uid, parents);
708
709            let mut entities = self.entities.write();
710            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
711            all_entities.push(entity);
712            *entities = Entities::from_entities(all_entities, None)
713                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
714
715            debug!("Added group entity: {}", name);
716            Ok(())
717        }
718
719        /// Add a topic entity
720        pub fn add_topic(
721            &self,
722            name: &str,
723            owner: Option<&str>,
724            partitions: i64,
725            replication_factor: i64,
726            retention_ms: i64,
727        ) -> CedarResult<()> {
728            let uid = EntityUid::from_type_name_and_id(
729                EntityTypeName::from_str("Rivven::Topic").unwrap(),
730                EntityId::from_str(name).unwrap(),
731            );
732
733            let mut attrs = HashMap::new();
734            attrs.insert(
735                "name".to_string(),
736                cedar_policy::RestrictedExpression::new_string(name.to_string()),
737            );
738
739            if let Some(o) = owner {
740                let owner_uid = EntityUid::from_type_name_and_id(
741                    EntityTypeName::from_str("Rivven::User").unwrap(),
742                    EntityId::from_str(o).unwrap(),
743                );
744                attrs.insert(
745                    "owner".to_string(),
746                    cedar_policy::RestrictedExpression::new_entity_uid(owner_uid),
747                );
748            }
749
750            attrs.insert(
751                "partitions".to_string(),
752                cedar_policy::RestrictedExpression::new_long(partitions),
753            );
754            attrs.insert(
755                "replication_factor".to_string(),
756                cedar_policy::RestrictedExpression::new_long(replication_factor),
757            );
758            attrs.insert(
759                "retention_ms".to_string(),
760                cedar_policy::RestrictedExpression::new_long(retention_ms),
761            );
762
763            let entity = Entity::new(uid, attrs, HashSet::new())
764                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
765
766            let mut entities = self.entities.write();
767            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
768            all_entities.push(entity);
769            *entities = Entities::from_entities(all_entities, None)
770                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
771
772            debug!("Added topic entity: {}", name);
773            Ok(())
774        }
775
776        /// Add a consumer group entity
777        pub fn add_consumer_group(&self, name: &str) -> CedarResult<()> {
778            let uid = EntityUid::from_type_name_and_id(
779                EntityTypeName::from_str("Rivven::ConsumerGroup").unwrap(),
780                EntityId::from_str(name).unwrap(),
781            );
782
783            let mut attrs = HashMap::new();
784            attrs.insert(
785                "name".to_string(),
786                cedar_policy::RestrictedExpression::new_string(name.to_string()),
787            );
788
789            let entity = Entity::new(uid, attrs, HashSet::new())
790                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
791
792            let mut entities = self.entities.write();
793            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
794            all_entities.push(entity);
795            *entities = Entities::from_entities(all_entities, None)
796                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
797
798            debug!("Added consumer group entity: {}", name);
799            Ok(())
800        }
801
802        /// Check if an action is authorized
803        pub fn is_authorized(
804            &self,
805            principal: &str,
806            action: RivvenAction,
807            resource: &RivvenResource,
808            context: &AuthzContext,
809        ) -> CedarResult<AuthzDecision> {
810            let principal_uid = EntityUid::from_type_name_and_id(
811                EntityTypeName::from_str("Rivven::User").unwrap(),
812                EntityId::from_str(principal).unwrap(),
813            );
814
815            let action_uid = action.to_entity_uid();
816            let resource_uid = resource.to_entity_uid();
817            let cedar_context = context.to_cedar_context()?;
818
819            let request = Request::new(
820                principal_uid.clone(),
821                action_uid.clone(),
822                resource_uid.clone(),
823                cedar_context,
824                None,
825            )
826            .map_err(|e| CedarError::Request(format!("Invalid request: {:?}", e)))?;
827
828            let authorizer = Authorizer::new();
829            let policies = self.policies.read();
830            let entities = self.entities.read();
831
832            let response = authorizer.is_authorized(&request, &policies, &entities);
833
834            let decision = match response.decision() {
835                Decision::Allow => {
836                    let satisfied: Vec<String> = response
837                        .diagnostics()
838                        .reason()
839                        .map(|id| id.to_string())
840                        .collect();
841                    AuthzDecision::allowed(satisfied)
842                }
843                Decision::Deny => {
844                    let denied: Vec<String> = response
845                        .diagnostics()
846                        .reason()
847                        .map(|id| id.to_string())
848                        .collect();
849
850                    let errors: Vec<String> = response
851                        .diagnostics()
852                        .errors()
853                        .map(|e| format!("{:?}", e))
854                        .collect();
855
856                    AuthzDecision {
857                        allowed: false,
858                        satisfied_policies: vec![],
859                        denied_policies: denied,
860                        errors,
861                        diagnostics: None,
862                    }
863                }
864            };
865
866            debug!(
867                "Authorization: {} {} {} -> {}",
868                principal,
869                action.as_str(),
870                resource.id(),
871                if decision.allowed { "ALLOW" } else { "DENY" }
872            );
873
874            Ok(decision)
875        }
876
877        /// Check authorization and return error if denied
878        pub fn authorize(
879            &self,
880            principal: &str,
881            action: RivvenAction,
882            resource: &RivvenResource,
883            context: &AuthzContext,
884        ) -> CedarResult<()> {
885            let decision = self.is_authorized(principal, action, resource, context)?;
886
887            if decision.allowed {
888                Ok(())
889            } else {
890                Err(CedarError::Denied {
891                    principal: principal.to_string(),
892                    action: action.as_str().to_string(),
893                    resource: format!("{:?}", resource),
894                })
895            }
896        }
897
898        /// Load default policies for common use cases
899        pub fn load_default_policies(&self) -> CedarResult<()> {
900            // Super admin has all permissions
901            self.add_policy(
902                "super-admin",
903                r#"
904permit(
905  principal in Rivven::Group::"admins",
906  action,
907  resource
908);
909"#,
910            )?;
911
912            // All users can describe topics
913            self.add_policy(
914                "describe-topics",
915                r#"
916permit(
917  principal,
918  action == Rivven::Action::"describe",
919  resource is Rivven::Topic
920);
921"#,
922            )?;
923
924            // All users can describe consumer groups
925            self.add_policy(
926                "describe-consumer-groups",
927                r#"
928permit(
929  principal,
930  action == Rivven::Action::"describe",
931  resource is Rivven::ConsumerGroup
932);
933"#,
934            )?;
935
936            info!("Loaded default Cedar policies");
937            Ok(())
938        }
939    }
940
941    impl Default for CedarAuthorizer {
942        fn default() -> Self {
943            Self::new().expect("Failed to create default authorizer")
944        }
945    }
946
947    // ============================================================================
948    // Tests
949    // ============================================================================
950
951    #[cfg(test)]
952    mod tests {
953        use super::*;
954
955        #[test]
956        fn test_create_authorizer() {
957            let authz = CedarAuthorizer::new().unwrap();
958            assert!(authz.schema.is_some());
959        }
960
961        #[test]
962        fn test_add_simple_policy() {
963            let authz = CedarAuthorizer::new_without_schema();
964
965            authz
966                .add_policy(
967                    "test-policy",
968                    r#"
969permit(
970  principal == Rivven::User::"alice",
971  action == Rivven::Action::"produce",
972  resource == Rivven::Topic::"orders"
973);
974"#,
975                )
976                .unwrap();
977        }
978
979        #[test]
980        fn test_add_user_and_authorize() {
981            let authz = CedarAuthorizer::new_without_schema();
982
983            // Add admin policy
984            authz
985                .add_policy(
986                    "admin-all",
987                    r#"
988permit(
989  principal in Rivven::Group::"admins",
990  action,
991  resource
992);
993"#,
994                )
995                .unwrap();
996
997            // Add group and user
998            authz.add_group("admins", &[]).unwrap();
999            authz
1000                .add_user(
1001                    "alice",
1002                    Some("alice@example.com"),
1003                    &["admin"],
1004                    &["admins"],
1005                    false,
1006                )
1007                .unwrap();
1008
1009            // Add topic
1010            authz
1011                .add_topic("orders", Some("alice"), 3, 2, 604800000)
1012                .unwrap();
1013
1014            // Check authorization
1015            let ctx = AuthzContext::new().with_ip("127.0.0.1");
1016            let decision = authz
1017                .is_authorized(
1018                    "alice",
1019                    RivvenAction::Produce,
1020                    &RivvenResource::Topic("orders".to_string()),
1021                    &ctx,
1022                )
1023                .unwrap();
1024
1025            assert!(decision.allowed);
1026        }
1027
1028        #[test]
1029        fn test_deny_unauthorized() {
1030            let authz = CedarAuthorizer::new_without_schema();
1031
1032            // Add a restrictive policy - only admins can produce
1033            authz
1034                .add_policy(
1035                    "only-admins-produce",
1036                    r#"
1037permit(
1038  principal in Rivven::Group::"admins",
1039  action == Rivven::Action::"produce",
1040  resource is Rivven::Topic
1041);
1042"#,
1043                )
1044                .unwrap();
1045
1046            // Add a non-admin user
1047            authz
1048                .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1049                .unwrap();
1050
1051            // Add topic
1052            authz.add_topic("orders", None, 3, 2, 604800000).unwrap();
1053
1054            // Check authorization - should be denied
1055            let ctx = AuthzContext::new();
1056            let decision = authz
1057                .is_authorized(
1058                    "bob",
1059                    RivvenAction::Produce,
1060                    &RivvenResource::Topic("orders".to_string()),
1061                    &ctx,
1062                )
1063                .unwrap();
1064
1065            assert!(!decision.allowed);
1066        }
1067
1068        #[test]
1069        fn test_context_attributes() {
1070            let ctx = AuthzContext::new()
1071                .with_ip("192.168.1.100")
1072                .with_tls_subject("CN=client,O=Rivven")
1073                .with_attr("custom_field", serde_json::json!("custom_value"));
1074
1075            assert_eq!(ctx.ip_address, Some("192.168.1.100".to_string()));
1076            assert_eq!(ctx.tls_subject, Some("CN=client,O=Rivven".to_string()));
1077            assert!(ctx.extra.contains_key("custom_field"));
1078        }
1079    }
1080}
1081
1082#[cfg(feature = "cedar")]
1083pub use cedar_impl::*;
1084
1085#[cfg(not(feature = "cedar"))]
1086mod no_cedar {
1087    //! Stub module when Cedar feature is disabled
1088
1089    use std::collections::HashMap;
1090    use thiserror::Error;
1091
1092    #[derive(Error, Debug)]
1093    pub enum CedarError {
1094        #[error("Cedar authorization not enabled. Build with 'cedar' feature.")]
1095        NotEnabled,
1096    }
1097
1098    pub type CedarResult<T> = Result<T, CedarError>;
1099
1100    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1101    pub enum RivvenAction {
1102        Produce,
1103        Consume,
1104        Create,
1105        Delete,
1106        Alter,
1107        Describe,
1108        Join,
1109        Leave,
1110        Commit,
1111        FetchOffsets,
1112        Admin,
1113        AlterConfigs,
1114        DescribeConfigs,
1115    }
1116
1117    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
1118    pub enum RivvenResource {
1119        Topic(String),
1120        ConsumerGroup(String),
1121        Schema(String),
1122        Cluster,
1123    }
1124
1125    #[derive(Debug, Clone, Default)]
1126    pub struct AuthzContext {
1127        pub ip_address: Option<String>,
1128        pub timestamp: String,
1129        pub tls_subject: Option<String>,
1130        pub extra: HashMap<String, serde_json::Value>,
1131    }
1132
1133    impl AuthzContext {
1134        pub fn new() -> Self {
1135            Self::default()
1136        }
1137
1138        pub fn with_ip(self, _ip: impl Into<String>) -> Self {
1139            self
1140        }
1141
1142        pub fn with_tls_subject(self, _subject: impl Into<String>) -> Self {
1143            self
1144        }
1145    }
1146
1147    pub struct CedarAuthorizer;
1148
1149    impl CedarAuthorizer {
1150        pub fn new() -> CedarResult<Self> {
1151            Err(CedarError::NotEnabled)
1152        }
1153
1154        pub fn authorize(
1155            &self,
1156            _principal: &str,
1157            _action: RivvenAction,
1158            _resource: &RivvenResource,
1159            _context: &AuthzContext,
1160        ) -> CedarResult<()> {
1161            Err(CedarError::NotEnabled)
1162        }
1163    }
1164}
1165
1166#[cfg(not(feature = "cedar"))]
1167pub use no_cedar::*;