Skip to main content

rivven_core/
cedar_authz.rs

1//! Cedar-based Authorization Engine for Rivven
2//!
3//! This module provides fine-grained, policy-as-code authorization using
4//! [Cedar](https://www.cedarpolicy.com/), the policy language from AWS.
5//!
6//! ## Why Cedar?
7//!
8//! - **Formal verification**: Policies can be mathematically proven correct
9//! - **Separation of concerns**: Policy changes without code changes
10//! - **Audit trail**: Every decision is explainable
11//! - **Attribute-based**: Context-aware decisions (time, IP, resource attributes)
12//!
13//! ## Rivven Entity Model
14//!
15//! ```text
16//! ┌─────────────────────────────────────────────────────────────────────┐
17//! │                     RIVVEN CEDAR SCHEMA                             │
18//! ├─────────────────────────────────────────────────────────────────────┤
19//! │                                                                     │
20//! │  namespace Rivven {                                                 │
21//! │    entity User in [Group] {                                         │
22//! │      email?: String,                                               │
23//! │      roles: Set<String>,                                           │
24//! │      service_account: Bool,                                        │
25//! │    };                                                              │
26//! │                                                                     │
27//! │    entity Group in [Group];                                        │
28//! │                                                                     │
29//! │    entity Topic {                                                  │
30//! │      owner?: User,                                                 │
31//! │      partitions: Long,                                             │
32//! │      replication_factor: Long,                                     │
33//! │      retention_ms: Long,                                           │
34//! │    };                                                              │
35//! │                                                                     │
36//! │    entity ConsumerGroup {                                          │
37//! │      members: Set<User>,                                           │
38//! │    };                                                              │
39//! │                                                                     │
40//! │    entity Cluster;                                                 │
41//! │                                                                     │
42//! │    action produce, consume, create, delete, alter, describe        │
43//! │      appliesTo { principal: [User, Group], resource: [Topic] };    │
44//! │                                                                     │
45//! │    action join, leave, commit, fetch_offsets                       │
46//! │      appliesTo { principal: [User, Group], resource: [ConsumerGroup] };│
47//! │                                                                     │
48//! │    action admin                                                     │
49//! │      appliesTo { principal: [User, Group], resource: [Cluster] };  │
50//! │  }                                                                  │
51//! └─────────────────────────────────────────────────────────────────────┘
52//! ```
53//!
54//! ## Example Policies
55//!
56//! ```cedar
57//! // Allow users to produce to topics they own
58//! permit(
59//!   principal,
60//!   action == Rivven::Action::"produce",
61//!   resource
62//! ) when {
63//!   resource.owner == principal
64//! };
65//!
66//! // Allow members of "producers" group to produce to any topic starting with "events-"
67//! permit(
68//!   principal in Rivven::Group::"producers",
69//!   action == Rivven::Action::"produce",
70//!   resource
71//! ) when {
72//!   resource.name.startsWith("events-")
73//! };
74//!
75//! // Deny access from untrusted IPs
76//! forbid(
77//!   principal,
78//!   action,
79//!   resource
80//! ) when {
81//!   context.ip_address.isLoopback() == false &&
82//!   context.ip_address.isInRange(ip("10.0.0.0/8")) == false
83//! };
84//! ```
85
86#[cfg(feature = "cedar")]
87mod cedar_impl {
88    use cedar_policy::{
89        Authorizer, Context, Decision, Entities, Entity, EntityId, EntityTypeName, EntityUid,
90        Policy, PolicySet, Request, Schema, ValidationMode,
91    };
92    use parking_lot::RwLock;
93    use serde::{Deserialize, Serialize};
94    use std::collections::{HashMap, HashSet};
95    use std::str::FromStr;
96    use thiserror::Error;
97    use tracing::{debug, info};
98
99    // ============================================================================
100    // Error Types
101    // ============================================================================
102
103    #[derive(Error, Debug)]
104    pub enum CedarError {
105        #[error("Policy parse error: {0}")]
106        PolicyParse(String),
107
108        #[error("Schema error: {0}")]
109        Schema(String),
110
111        #[error("Validation error: {0}")]
112        Validation(String),
113
114        #[error("Entity error: {0}")]
115        Entity(String),
116
117        #[error("Request error: {0}")]
118        Request(String),
119
120        #[error("Authorization denied: {principal} cannot {action} on {resource}")]
121        Denied {
122            principal: String,
123            action: String,
124            resource: String,
125        },
126
127        #[error("Internal error: {0}")]
128        Internal(String),
129    }
130
131    pub type CedarResult<T> = Result<T, CedarError>;
132
133    // ============================================================================
134    // Rivven Actions
135    // ============================================================================
136
137    /// Actions that can be performed in Rivven
138    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
139    #[serde(rename_all = "snake_case")]
140    pub enum RivvenAction {
141        // Topic actions
142        Produce,
143        Consume,
144        Create,
145        Delete,
146        Alter,
147        Describe,
148
149        // Consumer group actions
150        Join,
151        Leave,
152        Commit,
153        FetchOffsets,
154
155        // Cluster actions
156        Admin,
157        AlterConfigs,
158        DescribeConfigs,
159    }
160
161    impl RivvenAction {
162        pub fn as_str(&self) -> &'static str {
163            match self {
164                Self::Produce => "produce",
165                Self::Consume => "consume",
166                Self::Create => "create",
167                Self::Delete => "delete",
168                Self::Alter => "alter",
169                Self::Describe => "describe",
170                Self::Join => "join",
171                Self::Leave => "leave",
172                Self::Commit => "commit",
173                Self::FetchOffsets => "fetch_offsets",
174                Self::Admin => "admin",
175                Self::AlterConfigs => "alter_configs",
176                Self::DescribeConfigs => "describe_configs",
177            }
178        }
179
180        fn to_entity_uid(self) -> EntityUid {
181            EntityUid::from_type_name_and_id(
182                EntityTypeName::from_str("Rivven::Action").unwrap(),
183                EntityId::from_str(self.as_str()).unwrap(),
184            )
185        }
186    }
187
188    // ============================================================================
189    // Resource Types
190    // ============================================================================
191
192    /// Types of resources in Rivven
193    #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
194    #[serde(tag = "type", content = "name")]
195    pub enum RivvenResource {
196        Topic(String),
197        ConsumerGroup(String),
198        Schema(String),
199        Cluster,
200    }
201
202    impl RivvenResource {
203        fn type_name(&self) -> &'static str {
204            match self {
205                Self::Topic(_) => "Rivven::Topic",
206                Self::ConsumerGroup(_) => "Rivven::ConsumerGroup",
207                Self::Schema(_) => "Rivven::Schema",
208                Self::Cluster => "Rivven::Cluster",
209            }
210        }
211
212        fn id(&self) -> &str {
213            match self {
214                Self::Topic(name) => name,
215                Self::ConsumerGroup(name) => name,
216                Self::Schema(name) => name,
217                Self::Cluster => "default",
218            }
219        }
220
221        fn to_entity_uid(&self) -> EntityUid {
222            EntityUid::from_type_name_and_id(
223                EntityTypeName::from_str(self.type_name()).unwrap(),
224                EntityId::from_str(self.id()).unwrap(),
225            )
226        }
227    }
228
229    // ============================================================================
230    // Authorization Context
231    // ============================================================================
232
233    /// Context for authorization decisions
234    #[derive(Debug, Clone, Serialize, Deserialize)]
235    pub struct AuthzContext {
236        /// Client IP address
237        pub ip_address: Option<String>,
238
239        /// Request timestamp (ISO 8601)
240        pub timestamp: String,
241
242        /// TLS client certificate subject (if mTLS)
243        pub tls_subject: Option<String>,
244
245        /// Additional context attributes
246        #[serde(flatten)]
247        pub extra: HashMap<String, serde_json::Value>,
248    }
249
250    impl Default for AuthzContext {
251        fn default() -> Self {
252            Self {
253                ip_address: None,
254                timestamp: chrono::Utc::now().to_rfc3339(),
255                tls_subject: None,
256                extra: HashMap::new(),
257            }
258        }
259    }
260
261    impl AuthzContext {
262        pub fn new() -> Self {
263            Self::default()
264        }
265
266        pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
267            self.ip_address = Some(ip.into());
268            self
269        }
270
271        pub fn with_tls_subject(mut self, subject: impl Into<String>) -> Self {
272            self.tls_subject = Some(subject.into());
273            self
274        }
275
276        pub fn with_attr(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
277            self.extra.insert(key.into(), value);
278            self
279        }
280
281        fn to_cedar_context(&self) -> CedarResult<Context> {
282            // Cedar does not allow null values, so we must create a JSON object
283            // with only the non-null values
284            let mut context_map = serde_json::Map::new();
285
286            // Add timestamp (always present)
287            context_map.insert("timestamp".to_string(), serde_json::json!(self.timestamp));
288
289            // Add optional fields only if they have values
290            if let Some(ip) = &self.ip_address {
291                context_map.insert("ip_address".to_string(), serde_json::json!(ip));
292            }
293
294            if let Some(tls) = &self.tls_subject {
295                context_map.insert("tls_subject".to_string(), serde_json::json!(tls));
296            }
297
298            // Add extra context, filtering out null values
299            for (key, value) in &self.extra {
300                if !value.is_null() {
301                    context_map.insert(key.clone(), value.clone());
302                }
303            }
304
305            let json = serde_json::Value::Object(context_map);
306
307            Context::from_json_value(json, None)
308                .map_err(|e| CedarError::Request(format!("Invalid context: {}", e)))
309        }
310    }
311
312    // ============================================================================
313    // Authorization Decision
314    // ============================================================================
315
316    /// Result of an authorization check
317    #[derive(Debug, Clone)]
318    pub struct AuthzDecision {
319        /// Whether access is allowed
320        pub allowed: bool,
321
322        /// Policies that permitted the action
323        pub satisfied_policies: Vec<String>,
324
325        /// Policies that denied the action
326        pub denied_policies: Vec<String>,
327
328        /// Errors during evaluation
329        pub errors: Vec<String>,
330
331        /// Diagnostics for debugging
332        pub diagnostics: Option<String>,
333    }
334
335    impl AuthzDecision {
336        pub fn allowed(satisfied: Vec<String>) -> Self {
337            Self {
338                allowed: true,
339                satisfied_policies: satisfied,
340                denied_policies: vec![],
341                errors: vec![],
342                diagnostics: None,
343            }
344        }
345
346        pub fn denied(denied: Vec<String>) -> Self {
347            Self {
348                allowed: false,
349                satisfied_policies: vec![],
350                denied_policies: denied,
351                errors: vec![],
352                diagnostics: None,
353            }
354        }
355    }
356
357    // ============================================================================
358    // Cedar Authorizer
359    // ============================================================================
360
361    /// Cedar-based authorization engine
362    pub struct CedarAuthorizer {
363        /// Cedar schema for validation
364        schema: Option<Schema>,
365
366        /// Policy set
367        policies: RwLock<PolicySet>,
368
369        /// Entity store
370        entities: RwLock<Entities>,
371
372        /// Whether to validate policies against schema
373        validate_policies: bool,
374    }
375
376    impl CedarAuthorizer {
377        /// Create a new authorizer with default schema
378        pub fn new() -> CedarResult<Self> {
379            let schema = Self::default_schema()?;
380
381            Ok(Self {
382                schema: Some(schema),
383                policies: RwLock::new(PolicySet::new()),
384                entities: RwLock::new(Entities::empty()),
385                validate_policies: true,
386            })
387        }
388
389        /// Create without schema validation (for testing)
390        pub fn new_without_schema() -> Self {
391            Self {
392                schema: None,
393                policies: RwLock::new(PolicySet::new()),
394                entities: RwLock::new(Entities::empty()),
395                validate_policies: false,
396            }
397        }
398
399        /// Default Rivven Cedar schema
400        fn default_schema() -> CedarResult<Schema> {
401            let schema_src = r#"
402{
403  "Rivven": {
404    "entityTypes": {
405      "User": {
406        "memberOfTypes": ["Group"],
407        "shape": {
408          "type": "Record",
409          "attributes": {
410            "email": { "type": "String", "required": false },
411            "roles": { "type": "Set", "element": { "type": "String" } },
412            "service_account": { "type": "Boolean" }
413          }
414        }
415      },
416      "Group": {
417        "memberOfTypes": ["Group"]
418      },
419      "Topic": {
420        "shape": {
421          "type": "Record",
422          "attributes": {
423            "owner": { "type": "Entity", "name": "Rivven::User", "required": false },
424            "partitions": { "type": "Long" },
425            "replication_factor": { "type": "Long" },
426            "retention_ms": { "type": "Long" },
427            "name": { "type": "String" }
428          }
429        }
430      },
431      "ConsumerGroup": {
432        "shape": {
433          "type": "Record",
434          "attributes": {
435            "name": { "type": "String" }
436          }
437        }
438      },
439      "Schema": {
440        "shape": {
441          "type": "Record",
442          "attributes": {
443            "name": { "type": "String" },
444            "version": { "type": "Long" }
445          }
446        }
447      },
448      "Cluster": {}
449    },
450    "actions": {
451      "produce": {
452        "appliesTo": {
453          "principalTypes": ["User", "Group"],
454          "resourceTypes": ["Topic"]
455        }
456      },
457      "consume": {
458        "appliesTo": {
459          "principalTypes": ["User", "Group"],
460          "resourceTypes": ["Topic"]
461        }
462      },
463      "create": {
464        "appliesTo": {
465          "principalTypes": ["User", "Group"],
466          "resourceTypes": ["Topic", "Schema"]
467        }
468      },
469      "delete": {
470        "appliesTo": {
471          "principalTypes": ["User", "Group"],
472          "resourceTypes": ["Topic", "ConsumerGroup", "Schema"]
473        }
474      },
475      "alter": {
476        "appliesTo": {
477          "principalTypes": ["User", "Group"],
478          "resourceTypes": ["Topic", "Schema"]
479        }
480      },
481      "describe": {
482        "appliesTo": {
483          "principalTypes": ["User", "Group"],
484          "resourceTypes": ["Topic", "ConsumerGroup", "Schema", "Cluster"]
485        }
486      },
487      "join": {
488        "appliesTo": {
489          "principalTypes": ["User", "Group"],
490          "resourceTypes": ["ConsumerGroup"]
491        }
492      },
493      "leave": {
494        "appliesTo": {
495          "principalTypes": ["User", "Group"],
496          "resourceTypes": ["ConsumerGroup"]
497        }
498      },
499      "commit": {
500        "appliesTo": {
501          "principalTypes": ["User", "Group"],
502          "resourceTypes": ["ConsumerGroup"]
503        }
504      },
505      "fetch_offsets": {
506        "appliesTo": {
507          "principalTypes": ["User", "Group"],
508          "resourceTypes": ["ConsumerGroup"]
509        }
510      },
511      "admin": {
512        "appliesTo": {
513          "principalTypes": ["User", "Group"],
514          "resourceTypes": ["Cluster"]
515        }
516      },
517      "alter_configs": {
518        "appliesTo": {
519          "principalTypes": ["User", "Group"],
520          "resourceTypes": ["Cluster"]
521        }
522      },
523      "describe_configs": {
524        "appliesTo": {
525          "principalTypes": ["User", "Group"],
526          "resourceTypes": ["Cluster"]
527        }
528      }
529    }
530  }
531}
532"#;
533
534            Schema::from_json_str(schema_src)
535                .map_err(|e| CedarError::Schema(format!("Invalid schema: {:?}", e)))
536        }
537
538        /// Add a policy from Cedar source
539        pub fn add_policy(&self, id: &str, policy_src: &str) -> CedarResult<()> {
540            let policy = Policy::parse(Some(cedar_policy::PolicyId::new(id)), policy_src)
541                .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
542
543            // Validate against schema if enabled
544            if self.validate_policies {
545                if let Some(schema) = &self.schema {
546                    let mut temp_set = PolicySet::new();
547                    temp_set.add(policy.clone()).map_err(|e| {
548                        CedarError::PolicyParse(format!("Duplicate policy ID: {:?}", e))
549                    })?;
550
551                    let validator = cedar_policy::Validator::new(schema.clone());
552                    let result = validator.validate(&temp_set, ValidationMode::Strict);
553
554                    if !result.validation_passed() {
555                        let errors: Vec<String> = result
556                            .validation_errors()
557                            .map(|e| format!("{:?}", e))
558                            .collect();
559                        return Err(CedarError::Validation(errors.join("; ")));
560                    }
561                }
562            }
563
564            let mut policies = self.policies.write();
565            policies
566                .add(policy)
567                .map_err(|e| CedarError::PolicyParse(format!("Failed to add policy: {:?}", e)))?;
568
569            info!("Added Cedar policy: {}", id);
570            Ok(())
571        }
572
573        /// Add multiple policies from Cedar source
574        pub fn add_policies(&self, policies_src: &str) -> CedarResult<()> {
575            let parsed = PolicySet::from_str(policies_src)
576                .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
577
578            // Validate against schema if enabled
579            if self.validate_policies {
580                if let Some(schema) = &self.schema {
581                    let validator = cedar_policy::Validator::new(schema.clone());
582                    let result = validator.validate(&parsed, ValidationMode::Strict);
583
584                    if !result.validation_passed() {
585                        let errors: Vec<String> = result
586                            .validation_errors()
587                            .map(|e| format!("{:?}", e))
588                            .collect();
589                        return Err(CedarError::Validation(errors.join("; ")));
590                    }
591                }
592            }
593
594            let mut policies = self.policies.write();
595            for policy in parsed.policies() {
596                policies.add(policy.clone()).map_err(|e| {
597                    CedarError::PolicyParse(format!("Failed to add policy: {:?}", e))
598                })?;
599            }
600
601            Ok(())
602        }
603
604        /// Remove a policy by ID
605        pub fn remove_policy(&self, id: &str) -> CedarResult<()> {
606            // Cedar PolicySet doesn't support removal directly
607            // We need to rebuild without the policy
608            let mut policies = self.policies.write();
609            let policy_id = cedar_policy::PolicyId::new(id);
610
611            // Create a new PolicySet without the specified policy
612            let new_policies = policies
613                .policies()
614                .filter(|p| p.id() != &policy_id)
615                .cloned()
616                .collect::<Vec<_>>();
617
618            let mut new_set = PolicySet::new();
619            for policy in new_policies {
620                new_set.add(policy).ok();
621            }
622
623            *policies = new_set;
624            info!("Removed Cedar policy: {}", id);
625            Ok(())
626        }
627
628        /// Add a user entity
629        pub fn add_user(
630            &self,
631            username: &str,
632            email: Option<&str>,
633            roles: &[&str],
634            groups: &[&str],
635            is_service_account: bool,
636        ) -> CedarResult<()> {
637            let uid = EntityUid::from_type_name_and_id(
638                EntityTypeName::from_str("Rivven::User").unwrap(),
639                EntityId::from_str(username).unwrap(),
640            );
641
642            // Build parent groups
643            let parents: HashSet<EntityUid> = groups
644                .iter()
645                .map(|g| {
646                    EntityUid::from_type_name_and_id(
647                        EntityTypeName::from_str("Rivven::Group").unwrap(),
648                        EntityId::from_str(g).unwrap(),
649                    )
650                })
651                .collect();
652
653            // Build attributes
654            let mut attrs = HashMap::new();
655            if let Some(e) = email {
656                attrs.insert(
657                    "email".to_string(),
658                    cedar_policy::RestrictedExpression::new_string(e.to_string()),
659                );
660            }
661
662            let roles_set: Vec<_> = roles
663                .iter()
664                .map(|r| cedar_policy::RestrictedExpression::new_string(r.to_string()))
665                .collect();
666            attrs.insert(
667                "roles".to_string(),
668                cedar_policy::RestrictedExpression::new_set(roles_set),
669            );
670
671            attrs.insert(
672                "service_account".to_string(),
673                cedar_policy::RestrictedExpression::new_bool(is_service_account),
674            );
675
676            let entity = Entity::new(uid, attrs, parents)
677                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
678
679            let mut entities = self.entities.write();
680            // Convert to new Entities
681            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
682            all_entities.push(entity);
683            *entities = Entities::from_entities(all_entities, None)
684                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
685
686            debug!("Added user entity: {}", username);
687            Ok(())
688        }
689
690        /// Add a group entity
691        pub fn add_group(&self, name: &str, parent_groups: &[&str]) -> CedarResult<()> {
692            let uid = EntityUid::from_type_name_and_id(
693                EntityTypeName::from_str("Rivven::Group").unwrap(),
694                EntityId::from_str(name).unwrap(),
695            );
696
697            let parents: HashSet<EntityUid> = parent_groups
698                .iter()
699                .map(|g| {
700                    EntityUid::from_type_name_and_id(
701                        EntityTypeName::from_str("Rivven::Group").unwrap(),
702                        EntityId::from_str(g).unwrap(),
703                    )
704                })
705                .collect();
706
707            let entity = Entity::new_no_attrs(uid, parents);
708
709            let mut entities = self.entities.write();
710            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
711            all_entities.push(entity);
712            *entities = Entities::from_entities(all_entities, None)
713                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
714
715            debug!("Added group entity: {}", name);
716            Ok(())
717        }
718
719        /// Add a topic entity
720        pub fn add_topic(
721            &self,
722            name: &str,
723            owner: Option<&str>,
724            partitions: i64,
725            replication_factor: i64,
726            retention_ms: i64,
727        ) -> CedarResult<()> {
728            let uid = EntityUid::from_type_name_and_id(
729                EntityTypeName::from_str("Rivven::Topic").unwrap(),
730                EntityId::from_str(name).unwrap(),
731            );
732
733            let mut attrs = HashMap::new();
734            attrs.insert(
735                "name".to_string(),
736                cedar_policy::RestrictedExpression::new_string(name.to_string()),
737            );
738
739            if let Some(o) = owner {
740                let owner_uid = EntityUid::from_type_name_and_id(
741                    EntityTypeName::from_str("Rivven::User").unwrap(),
742                    EntityId::from_str(o).unwrap(),
743                );
744                attrs.insert(
745                    "owner".to_string(),
746                    cedar_policy::RestrictedExpression::new_entity_uid(owner_uid),
747                );
748            }
749
750            attrs.insert(
751                "partitions".to_string(),
752                cedar_policy::RestrictedExpression::new_long(partitions),
753            );
754            attrs.insert(
755                "replication_factor".to_string(),
756                cedar_policy::RestrictedExpression::new_long(replication_factor),
757            );
758            attrs.insert(
759                "retention_ms".to_string(),
760                cedar_policy::RestrictedExpression::new_long(retention_ms),
761            );
762
763            let entity = Entity::new(uid, attrs, HashSet::new())
764                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
765
766            let mut entities = self.entities.write();
767            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
768            all_entities.push(entity);
769            *entities = Entities::from_entities(all_entities, None)
770                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
771
772            debug!("Added topic entity: {}", name);
773            Ok(())
774        }
775
776        /// Add a consumer group entity
777        pub fn add_consumer_group(&self, name: &str) -> CedarResult<()> {
778            let uid = EntityUid::from_type_name_and_id(
779                EntityTypeName::from_str("Rivven::ConsumerGroup").unwrap(),
780                EntityId::from_str(name).unwrap(),
781            );
782
783            let mut attrs = HashMap::new();
784            attrs.insert(
785                "name".to_string(),
786                cedar_policy::RestrictedExpression::new_string(name.to_string()),
787            );
788
789            let entity = Entity::new(uid, attrs, HashSet::new())
790                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
791
792            let mut entities = self.entities.write();
793            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
794            all_entities.push(entity);
795            *entities = Entities::from_entities(all_entities, None)
796                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
797
798            debug!("Added consumer group entity: {}", name);
799            Ok(())
800        }
801
802        /// Add a schema entity
803        pub fn add_schema(&self, name: &str, version: i64) -> CedarResult<()> {
804            let uid = EntityUid::from_type_name_and_id(
805                EntityTypeName::from_str("Rivven::Schema").unwrap(),
806                EntityId::from_str(name).unwrap(),
807            );
808
809            let mut attrs = HashMap::new();
810            attrs.insert(
811                "name".to_string(),
812                cedar_policy::RestrictedExpression::new_string(name.to_string()),
813            );
814            attrs.insert(
815                "version".to_string(),
816                cedar_policy::RestrictedExpression::new_long(version),
817            );
818
819            let entity = Entity::new(uid, attrs, HashSet::new())
820                .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
821
822            let mut entities = self.entities.write();
823            let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
824            all_entities.push(entity);
825            *entities = Entities::from_entities(all_entities, None)
826                .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
827
828            debug!("Added schema entity: {} (version {})", name, version);
829            Ok(())
830        }
831
832        /// Check if an action is authorized
833        pub fn is_authorized(
834            &self,
835            principal: &str,
836            action: RivvenAction,
837            resource: &RivvenResource,
838            context: &AuthzContext,
839        ) -> CedarResult<AuthzDecision> {
840            let principal_uid = EntityUid::from_type_name_and_id(
841                EntityTypeName::from_str("Rivven::User").unwrap(),
842                EntityId::from_str(principal).unwrap(),
843            );
844
845            let action_uid = action.to_entity_uid();
846            let resource_uid = resource.to_entity_uid();
847            let cedar_context = context.to_cedar_context()?;
848
849            let request = Request::new(
850                principal_uid.clone(),
851                action_uid.clone(),
852                resource_uid.clone(),
853                cedar_context,
854                None,
855            )
856            .map_err(|e| CedarError::Request(format!("Invalid request: {:?}", e)))?;
857
858            let authorizer = Authorizer::new();
859            let policies = self.policies.read();
860            let entities = self.entities.read();
861
862            let response = authorizer.is_authorized(&request, &policies, &entities);
863
864            let decision = match response.decision() {
865                Decision::Allow => {
866                    let satisfied: Vec<String> = response
867                        .diagnostics()
868                        .reason()
869                        .map(|id| id.to_string())
870                        .collect();
871                    AuthzDecision::allowed(satisfied)
872                }
873                Decision::Deny => {
874                    let denied: Vec<String> = response
875                        .diagnostics()
876                        .reason()
877                        .map(|id| id.to_string())
878                        .collect();
879
880                    let errors: Vec<String> = response
881                        .diagnostics()
882                        .errors()
883                        .map(|e| format!("{:?}", e))
884                        .collect();
885
886                    AuthzDecision {
887                        allowed: false,
888                        satisfied_policies: vec![],
889                        denied_policies: denied,
890                        errors,
891                        diagnostics: None,
892                    }
893                }
894            };
895
896            debug!(
897                "Authorization: {} {} {} -> {}",
898                principal,
899                action.as_str(),
900                resource.id(),
901                if decision.allowed { "ALLOW" } else { "DENY" }
902            );
903
904            Ok(decision)
905        }
906
907        /// Check authorization and return error if denied
908        pub fn authorize(
909            &self,
910            principal: &str,
911            action: RivvenAction,
912            resource: &RivvenResource,
913            context: &AuthzContext,
914        ) -> CedarResult<()> {
915            let decision = self.is_authorized(principal, action, resource, context)?;
916
917            if decision.allowed {
918                Ok(())
919            } else {
920                Err(CedarError::Denied {
921                    principal: principal.to_string(),
922                    action: action.as_str().to_string(),
923                    resource: format!("{:?}", resource),
924                })
925            }
926        }
927
928        /// Load default policies for common use cases
929        pub fn load_default_policies(&self) -> CedarResult<()> {
930            // Super admin has all permissions
931            self.add_policy(
932                "super-admin",
933                r#"
934permit(
935  principal in Rivven::Group::"admins",
936  action,
937  resource
938);
939"#,
940            )?;
941
942            // All users can describe topics
943            self.add_policy(
944                "describe-topics",
945                r#"
946permit(
947  principal,
948  action == Rivven::Action::"describe",
949  resource is Rivven::Topic
950);
951"#,
952            )?;
953
954            // All users can describe consumer groups
955            self.add_policy(
956                "describe-consumer-groups",
957                r#"
958permit(
959  principal,
960  action == Rivven::Action::"describe",
961  resource is Rivven::ConsumerGroup
962);
963"#,
964            )?;
965
966            info!("Loaded default Cedar policies");
967            Ok(())
968        }
969    }
970
971    impl Default for CedarAuthorizer {
972        fn default() -> Self {
973            Self::new().expect("Failed to create default authorizer")
974        }
975    }
976
977    // ============================================================================
978    // Tests
979    // ============================================================================
980
981    #[cfg(test)]
982    mod tests {
983        use super::*;
984
985        #[test]
986        fn test_create_authorizer() {
987            let authz = CedarAuthorizer::new().unwrap();
988            assert!(authz.schema.is_some());
989        }
990
991        #[test]
992        fn test_add_simple_policy() {
993            let authz = CedarAuthorizer::new_without_schema();
994
995            authz
996                .add_policy(
997                    "test-policy",
998                    r#"
999permit(
1000  principal == Rivven::User::"alice",
1001  action == Rivven::Action::"produce",
1002  resource == Rivven::Topic::"orders"
1003);
1004"#,
1005                )
1006                .unwrap();
1007        }
1008
1009        #[test]
1010        fn test_add_user_and_authorize() {
1011            let authz = CedarAuthorizer::new_without_schema();
1012
1013            // Add admin policy
1014            authz
1015                .add_policy(
1016                    "admin-all",
1017                    r#"
1018permit(
1019  principal in Rivven::Group::"admins",
1020  action,
1021  resource
1022);
1023"#,
1024                )
1025                .unwrap();
1026
1027            // Add group and user
1028            authz.add_group("admins", &[]).unwrap();
1029            authz
1030                .add_user(
1031                    "alice",
1032                    Some("alice@example.com"),
1033                    &["admin"],
1034                    &["admins"],
1035                    false,
1036                )
1037                .unwrap();
1038
1039            // Add topic
1040            authz
1041                .add_topic("orders", Some("alice"), 3, 2, 604800000)
1042                .unwrap();
1043
1044            // Check authorization
1045            let ctx = AuthzContext::new().with_ip("127.0.0.1");
1046            let decision = authz
1047                .is_authorized(
1048                    "alice",
1049                    RivvenAction::Produce,
1050                    &RivvenResource::Topic("orders".to_string()),
1051                    &ctx,
1052                )
1053                .unwrap();
1054
1055            assert!(decision.allowed);
1056        }
1057
1058        #[test]
1059        fn test_deny_unauthorized() {
1060            let authz = CedarAuthorizer::new_without_schema();
1061
1062            // Add a restrictive policy - only admins can produce
1063            authz
1064                .add_policy(
1065                    "only-admins-produce",
1066                    r#"
1067permit(
1068  principal in Rivven::Group::"admins",
1069  action == Rivven::Action::"produce",
1070  resource is Rivven::Topic
1071);
1072"#,
1073                )
1074                .unwrap();
1075
1076            // Add a non-admin user
1077            authz
1078                .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1079                .unwrap();
1080
1081            // Add topic
1082            authz.add_topic("orders", None, 3, 2, 604800000).unwrap();
1083
1084            // Check authorization - should be denied
1085            let ctx = AuthzContext::new();
1086            let decision = authz
1087                .is_authorized(
1088                    "bob",
1089                    RivvenAction::Produce,
1090                    &RivvenResource::Topic("orders".to_string()),
1091                    &ctx,
1092                )
1093                .unwrap();
1094
1095            assert!(!decision.allowed);
1096        }
1097
1098        #[test]
1099        fn test_context_attributes() {
1100            let ctx = AuthzContext::new()
1101                .with_ip("192.168.1.100")
1102                .with_tls_subject("CN=client,O=Rivven")
1103                .with_attr("custom_field", serde_json::json!("custom_value"));
1104
1105            assert_eq!(ctx.ip_address, Some("192.168.1.100".to_string()));
1106            assert_eq!(ctx.tls_subject, Some("CN=client,O=Rivven".to_string()));
1107            assert!(ctx.extra.contains_key("custom_field"));
1108        }
1109    }
1110}
1111
1112#[cfg(feature = "cedar")]
1113pub use cedar_impl::*;
1114
1115#[cfg(not(feature = "cedar"))]
1116mod no_cedar {
1117    //! Stub module when Cedar feature is disabled
1118
1119    use std::collections::HashMap;
1120    use thiserror::Error;
1121
1122    #[derive(Error, Debug)]
1123    pub enum CedarError {
1124        #[error("Cedar authorization not enabled. Build with 'cedar' feature.")]
1125        NotEnabled,
1126    }
1127
1128    pub type CedarResult<T> = Result<T, CedarError>;
1129
1130    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1131    pub enum RivvenAction {
1132        Produce,
1133        Consume,
1134        Create,
1135        Delete,
1136        Alter,
1137        Describe,
1138        Join,
1139        Leave,
1140        Commit,
1141        FetchOffsets,
1142        Admin,
1143        AlterConfigs,
1144        DescribeConfigs,
1145    }
1146
1147    #[derive(Debug, Clone, PartialEq, Eq, Hash)]
1148    pub enum RivvenResource {
1149        Topic(String),
1150        ConsumerGroup(String),
1151        Schema(String),
1152        Cluster,
1153    }
1154
1155    #[derive(Debug, Clone, Default)]
1156    pub struct AuthzContext {
1157        pub ip_address: Option<String>,
1158        pub timestamp: String,
1159        pub tls_subject: Option<String>,
1160        pub extra: HashMap<String, serde_json::Value>,
1161    }
1162
1163    impl AuthzContext {
1164        pub fn new() -> Self {
1165            Self::default()
1166        }
1167
1168        pub fn with_ip(self, _ip: impl Into<String>) -> Self {
1169            self
1170        }
1171
1172        pub fn with_tls_subject(self, _subject: impl Into<String>) -> Self {
1173            self
1174        }
1175    }
1176
1177    pub struct CedarAuthorizer;
1178
1179    impl CedarAuthorizer {
1180        pub fn new() -> CedarResult<Self> {
1181            Err(CedarError::NotEnabled)
1182        }
1183
1184        pub fn authorize(
1185            &self,
1186            _principal: &str,
1187            _action: RivvenAction,
1188            _resource: &RivvenResource,
1189            _context: &AuthzContext,
1190        ) -> CedarResult<()> {
1191            Err(CedarError::NotEnabled)
1192        }
1193    }
1194}
1195
1196#[cfg(not(feature = "cedar"))]
1197pub use no_cedar::*;