1#[cfg(feature = "cedar")]
87mod cedar_impl {
88 use cedar_policy::{
89 Authorizer, Context, Decision, Entities, Entity, EntityId, EntityTypeName, EntityUid,
90 Policy, PolicySet, Request, Schema, ValidationMode,
91 };
92 use parking_lot::RwLock;
93 use serde::{Deserialize, Serialize};
94 use std::collections::{HashMap, HashSet};
95 use std::str::FromStr;
96 use thiserror::Error;
97 use tracing::{debug, info};
98
99 #[derive(Error, Debug)]
104 pub enum CedarError {
105 #[error("Policy parse error: {0}")]
106 PolicyParse(String),
107
108 #[error("Schema error: {0}")]
109 Schema(String),
110
111 #[error("Validation error: {0}")]
112 Validation(String),
113
114 #[error("Entity error: {0}")]
115 Entity(String),
116
117 #[error("Request error: {0}")]
118 Request(String),
119
120 #[error("Authorization denied: {principal} cannot {action} on {resource}")]
121 Denied {
122 principal: String,
123 action: String,
124 resource: String,
125 },
126
127 #[error("Internal error: {0}")]
128 Internal(String),
129 }
130
131 pub type CedarResult<T> = Result<T, CedarError>;
132
133 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
139 #[serde(rename_all = "snake_case")]
140 pub enum RivvenAction {
141 Produce,
143 Consume,
144 Create,
145 Delete,
146 Alter,
147 Describe,
148
149 Join,
151 Leave,
152 Commit,
153 FetchOffsets,
154
155 Admin,
157 AlterConfigs,
158 DescribeConfigs,
159 }
160
161 impl RivvenAction {
162 pub fn as_str(&self) -> &'static str {
163 match self {
164 Self::Produce => "produce",
165 Self::Consume => "consume",
166 Self::Create => "create",
167 Self::Delete => "delete",
168 Self::Alter => "alter",
169 Self::Describe => "describe",
170 Self::Join => "join",
171 Self::Leave => "leave",
172 Self::Commit => "commit",
173 Self::FetchOffsets => "fetch_offsets",
174 Self::Admin => "admin",
175 Self::AlterConfigs => "alter_configs",
176 Self::DescribeConfigs => "describe_configs",
177 }
178 }
179
180 fn to_entity_uid(self) -> EntityUid {
181 EntityUid::from_type_name_and_id(
182 EntityTypeName::from_str("Rivven::Action").unwrap(),
183 EntityId::from_str(self.as_str()).unwrap(),
184 )
185 }
186 }
187
188 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
194 #[serde(tag = "type", content = "name")]
195 pub enum RivvenResource {
196 Topic(String),
197 ConsumerGroup(String),
198 Schema(String),
199 Cluster,
200 }
201
202 impl RivvenResource {
203 fn type_name(&self) -> &'static str {
204 match self {
205 Self::Topic(_) => "Rivven::Topic",
206 Self::ConsumerGroup(_) => "Rivven::ConsumerGroup",
207 Self::Schema(_) => "Rivven::Schema",
208 Self::Cluster => "Rivven::Cluster",
209 }
210 }
211
212 fn id(&self) -> &str {
213 match self {
214 Self::Topic(name) => name,
215 Self::ConsumerGroup(name) => name,
216 Self::Schema(name) => name,
217 Self::Cluster => "default",
218 }
219 }
220
221 fn to_entity_uid(&self) -> EntityUid {
222 EntityUid::from_type_name_and_id(
223 EntityTypeName::from_str(self.type_name()).unwrap(),
224 EntityId::from_str(self.id()).unwrap(),
225 )
226 }
227 }
228
229 #[derive(Debug, Clone, Serialize, Deserialize)]
235 pub struct AuthzContext {
236 pub ip_address: Option<String>,
238
239 pub timestamp: String,
241
242 pub tls_subject: Option<String>,
244
245 #[serde(flatten)]
247 pub extra: HashMap<String, serde_json::Value>,
248 }
249
250 impl Default for AuthzContext {
251 fn default() -> Self {
252 Self {
253 ip_address: None,
254 timestamp: chrono::Utc::now().to_rfc3339(),
255 tls_subject: None,
256 extra: HashMap::new(),
257 }
258 }
259 }
260
261 impl AuthzContext {
262 pub fn new() -> Self {
263 Self::default()
264 }
265
266 pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
267 self.ip_address = Some(ip.into());
268 self
269 }
270
271 pub fn with_tls_subject(mut self, subject: impl Into<String>) -> Self {
272 self.tls_subject = Some(subject.into());
273 self
274 }
275
276 pub fn with_attr(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
277 self.extra.insert(key.into(), value);
278 self
279 }
280
281 fn to_cedar_context(&self) -> CedarResult<Context> {
282 let mut context_map = serde_json::Map::new();
285
286 context_map.insert("timestamp".to_string(), serde_json::json!(self.timestamp));
288
289 if let Some(ip) = &self.ip_address {
291 context_map.insert("ip_address".to_string(), serde_json::json!(ip));
292 }
293
294 if let Some(tls) = &self.tls_subject {
295 context_map.insert("tls_subject".to_string(), serde_json::json!(tls));
296 }
297
298 for (key, value) in &self.extra {
300 if !value.is_null() {
301 context_map.insert(key.clone(), value.clone());
302 }
303 }
304
305 let json = serde_json::Value::Object(context_map);
306
307 Context::from_json_value(json, None)
308 .map_err(|e| CedarError::Request(format!("Invalid context: {}", e)))
309 }
310 }
311
312 #[derive(Debug, Clone)]
318 pub struct AuthzDecision {
319 pub allowed: bool,
321
322 pub satisfied_policies: Vec<String>,
324
325 pub denied_policies: Vec<String>,
327
328 pub errors: Vec<String>,
330
331 pub diagnostics: Option<String>,
333 }
334
335 impl AuthzDecision {
336 pub fn allowed(satisfied: Vec<String>) -> Self {
337 Self {
338 allowed: true,
339 satisfied_policies: satisfied,
340 denied_policies: vec![],
341 errors: vec![],
342 diagnostics: None,
343 }
344 }
345
346 pub fn denied(denied: Vec<String>) -> Self {
347 Self {
348 allowed: false,
349 satisfied_policies: vec![],
350 denied_policies: denied,
351 errors: vec![],
352 diagnostics: None,
353 }
354 }
355 }
356
357 pub struct CedarAuthorizer {
363 schema: Option<Schema>,
365
366 policies: RwLock<PolicySet>,
368
369 entities: RwLock<Entities>,
371
372 validate_policies: bool,
374 }
375
376 impl CedarAuthorizer {
377 pub fn new() -> CedarResult<Self> {
379 let schema = Self::default_schema()?;
380
381 Ok(Self {
382 schema: Some(schema),
383 policies: RwLock::new(PolicySet::new()),
384 entities: RwLock::new(Entities::empty()),
385 validate_policies: true,
386 })
387 }
388
389 pub fn new_without_schema() -> Self {
391 Self {
392 schema: None,
393 policies: RwLock::new(PolicySet::new()),
394 entities: RwLock::new(Entities::empty()),
395 validate_policies: false,
396 }
397 }
398
399 fn default_schema() -> CedarResult<Schema> {
401 let schema_src = r#"
402{
403 "Rivven": {
404 "entityTypes": {
405 "User": {
406 "memberOfTypes": ["Group"],
407 "shape": {
408 "type": "Record",
409 "attributes": {
410 "email": { "type": "String", "required": false },
411 "roles": { "type": "Set", "element": { "type": "String" } },
412 "service_account": { "type": "Boolean" }
413 }
414 }
415 },
416 "Group": {
417 "memberOfTypes": ["Group"]
418 },
419 "Topic": {
420 "shape": {
421 "type": "Record",
422 "attributes": {
423 "owner": { "type": "Entity", "name": "Rivven::User", "required": false },
424 "partitions": { "type": "Long" },
425 "replication_factor": { "type": "Long" },
426 "retention_ms": { "type": "Long" },
427 "name": { "type": "String" }
428 }
429 }
430 },
431 "ConsumerGroup": {
432 "shape": {
433 "type": "Record",
434 "attributes": {
435 "name": { "type": "String" }
436 }
437 }
438 },
439 "Schema": {
440 "shape": {
441 "type": "Record",
442 "attributes": {
443 "name": { "type": "String" },
444 "version": { "type": "Long" }
445 }
446 }
447 },
448 "Cluster": {}
449 },
450 "actions": {
451 "produce": {
452 "appliesTo": {
453 "principalTypes": ["User", "Group"],
454 "resourceTypes": ["Topic"]
455 }
456 },
457 "consume": {
458 "appliesTo": {
459 "principalTypes": ["User", "Group"],
460 "resourceTypes": ["Topic"]
461 }
462 },
463 "create": {
464 "appliesTo": {
465 "principalTypes": ["User", "Group"],
466 "resourceTypes": ["Topic", "Schema"]
467 }
468 },
469 "delete": {
470 "appliesTo": {
471 "principalTypes": ["User", "Group"],
472 "resourceTypes": ["Topic", "ConsumerGroup", "Schema"]
473 }
474 },
475 "alter": {
476 "appliesTo": {
477 "principalTypes": ["User", "Group"],
478 "resourceTypes": ["Topic", "Schema"]
479 }
480 },
481 "describe": {
482 "appliesTo": {
483 "principalTypes": ["User", "Group"],
484 "resourceTypes": ["Topic", "ConsumerGroup", "Schema", "Cluster"]
485 }
486 },
487 "join": {
488 "appliesTo": {
489 "principalTypes": ["User", "Group"],
490 "resourceTypes": ["ConsumerGroup"]
491 }
492 },
493 "leave": {
494 "appliesTo": {
495 "principalTypes": ["User", "Group"],
496 "resourceTypes": ["ConsumerGroup"]
497 }
498 },
499 "commit": {
500 "appliesTo": {
501 "principalTypes": ["User", "Group"],
502 "resourceTypes": ["ConsumerGroup"]
503 }
504 },
505 "fetch_offsets": {
506 "appliesTo": {
507 "principalTypes": ["User", "Group"],
508 "resourceTypes": ["ConsumerGroup"]
509 }
510 },
511 "admin": {
512 "appliesTo": {
513 "principalTypes": ["User", "Group"],
514 "resourceTypes": ["Cluster"]
515 }
516 },
517 "alter_configs": {
518 "appliesTo": {
519 "principalTypes": ["User", "Group"],
520 "resourceTypes": ["Cluster"]
521 }
522 },
523 "describe_configs": {
524 "appliesTo": {
525 "principalTypes": ["User", "Group"],
526 "resourceTypes": ["Cluster"]
527 }
528 }
529 }
530 }
531}
532"#;
533
534 Schema::from_json_str(schema_src)
535 .map_err(|e| CedarError::Schema(format!("Invalid schema: {:?}", e)))
536 }
537
538 pub fn add_policy(&self, id: &str, policy_src: &str) -> CedarResult<()> {
540 let policy = Policy::parse(Some(cedar_policy::PolicyId::new(id)), policy_src)
541 .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
542
543 if self.validate_policies {
545 if let Some(schema) = &self.schema {
546 let mut temp_set = PolicySet::new();
547 temp_set.add(policy.clone()).map_err(|e| {
548 CedarError::PolicyParse(format!("Duplicate policy ID: {:?}", e))
549 })?;
550
551 let validator = cedar_policy::Validator::new(schema.clone());
552 let result = validator.validate(&temp_set, ValidationMode::Strict);
553
554 if !result.validation_passed() {
555 let errors: Vec<String> = result
556 .validation_errors()
557 .map(|e| format!("{:?}", e))
558 .collect();
559 return Err(CedarError::Validation(errors.join("; ")));
560 }
561 }
562 }
563
564 let mut policies = self.policies.write();
565 policies
566 .add(policy)
567 .map_err(|e| CedarError::PolicyParse(format!("Failed to add policy: {:?}", e)))?;
568
569 info!("Added Cedar policy: {}", id);
570 Ok(())
571 }
572
573 pub fn add_policies(&self, policies_src: &str) -> CedarResult<()> {
575 let parsed = PolicySet::from_str(policies_src)
576 .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
577
578 if self.validate_policies {
580 if let Some(schema) = &self.schema {
581 let validator = cedar_policy::Validator::new(schema.clone());
582 let result = validator.validate(&parsed, ValidationMode::Strict);
583
584 if !result.validation_passed() {
585 let errors: Vec<String> = result
586 .validation_errors()
587 .map(|e| format!("{:?}", e))
588 .collect();
589 return Err(CedarError::Validation(errors.join("; ")));
590 }
591 }
592 }
593
594 let mut policies = self.policies.write();
595 for policy in parsed.policies() {
596 policies.add(policy.clone()).map_err(|e| {
597 CedarError::PolicyParse(format!("Failed to add policy: {:?}", e))
598 })?;
599 }
600
601 Ok(())
602 }
603
604 pub fn remove_policy(&self, id: &str) -> CedarResult<()> {
606 let mut policies = self.policies.write();
609 let policy_id = cedar_policy::PolicyId::new(id);
610
611 let new_policies = policies
613 .policies()
614 .filter(|p| p.id() != &policy_id)
615 .cloned()
616 .collect::<Vec<_>>();
617
618 let mut new_set = PolicySet::new();
619 for policy in new_policies {
620 new_set.add(policy).ok();
621 }
622
623 *policies = new_set;
624 info!("Removed Cedar policy: {}", id);
625 Ok(())
626 }
627
628 pub fn add_user(
630 &self,
631 username: &str,
632 email: Option<&str>,
633 roles: &[&str],
634 groups: &[&str],
635 is_service_account: bool,
636 ) -> CedarResult<()> {
637 let uid = EntityUid::from_type_name_and_id(
638 EntityTypeName::from_str("Rivven::User").unwrap(),
639 EntityId::from_str(username).unwrap(),
640 );
641
642 let parents: HashSet<EntityUid> = groups
644 .iter()
645 .map(|g| {
646 EntityUid::from_type_name_and_id(
647 EntityTypeName::from_str("Rivven::Group").unwrap(),
648 EntityId::from_str(g).unwrap(),
649 )
650 })
651 .collect();
652
653 let mut attrs = HashMap::new();
655 if let Some(e) = email {
656 attrs.insert(
657 "email".to_string(),
658 cedar_policy::RestrictedExpression::new_string(e.to_string()),
659 );
660 }
661
662 let roles_set: Vec<_> = roles
663 .iter()
664 .map(|r| cedar_policy::RestrictedExpression::new_string(r.to_string()))
665 .collect();
666 attrs.insert(
667 "roles".to_string(),
668 cedar_policy::RestrictedExpression::new_set(roles_set),
669 );
670
671 attrs.insert(
672 "service_account".to_string(),
673 cedar_policy::RestrictedExpression::new_bool(is_service_account),
674 );
675
676 let entity = Entity::new(uid, attrs, parents)
677 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
678
679 let mut entities = self.entities.write();
680 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
682 all_entities.push(entity);
683 *entities = Entities::from_entities(all_entities, None)
684 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
685
686 debug!("Added user entity: {}", username);
687 Ok(())
688 }
689
690 pub fn add_group(&self, name: &str, parent_groups: &[&str]) -> CedarResult<()> {
692 let uid = EntityUid::from_type_name_and_id(
693 EntityTypeName::from_str("Rivven::Group").unwrap(),
694 EntityId::from_str(name).unwrap(),
695 );
696
697 let parents: HashSet<EntityUid> = parent_groups
698 .iter()
699 .map(|g| {
700 EntityUid::from_type_name_and_id(
701 EntityTypeName::from_str("Rivven::Group").unwrap(),
702 EntityId::from_str(g).unwrap(),
703 )
704 })
705 .collect();
706
707 let entity = Entity::new_no_attrs(uid, parents);
708
709 let mut entities = self.entities.write();
710 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
711 all_entities.push(entity);
712 *entities = Entities::from_entities(all_entities, None)
713 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
714
715 debug!("Added group entity: {}", name);
716 Ok(())
717 }
718
719 pub fn add_topic(
721 &self,
722 name: &str,
723 owner: Option<&str>,
724 partitions: i64,
725 replication_factor: i64,
726 retention_ms: i64,
727 ) -> CedarResult<()> {
728 let uid = EntityUid::from_type_name_and_id(
729 EntityTypeName::from_str("Rivven::Topic").unwrap(),
730 EntityId::from_str(name).unwrap(),
731 );
732
733 let mut attrs = HashMap::new();
734 attrs.insert(
735 "name".to_string(),
736 cedar_policy::RestrictedExpression::new_string(name.to_string()),
737 );
738
739 if let Some(o) = owner {
740 let owner_uid = EntityUid::from_type_name_and_id(
741 EntityTypeName::from_str("Rivven::User").unwrap(),
742 EntityId::from_str(o).unwrap(),
743 );
744 attrs.insert(
745 "owner".to_string(),
746 cedar_policy::RestrictedExpression::new_entity_uid(owner_uid),
747 );
748 }
749
750 attrs.insert(
751 "partitions".to_string(),
752 cedar_policy::RestrictedExpression::new_long(partitions),
753 );
754 attrs.insert(
755 "replication_factor".to_string(),
756 cedar_policy::RestrictedExpression::new_long(replication_factor),
757 );
758 attrs.insert(
759 "retention_ms".to_string(),
760 cedar_policy::RestrictedExpression::new_long(retention_ms),
761 );
762
763 let entity = Entity::new(uid, attrs, HashSet::new())
764 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
765
766 let mut entities = self.entities.write();
767 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
768 all_entities.push(entity);
769 *entities = Entities::from_entities(all_entities, None)
770 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
771
772 debug!("Added topic entity: {}", name);
773 Ok(())
774 }
775
776 pub fn add_consumer_group(&self, name: &str) -> CedarResult<()> {
778 let uid = EntityUid::from_type_name_and_id(
779 EntityTypeName::from_str("Rivven::ConsumerGroup").unwrap(),
780 EntityId::from_str(name).unwrap(),
781 );
782
783 let mut attrs = HashMap::new();
784 attrs.insert(
785 "name".to_string(),
786 cedar_policy::RestrictedExpression::new_string(name.to_string()),
787 );
788
789 let entity = Entity::new(uid, attrs, HashSet::new())
790 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
791
792 let mut entities = self.entities.write();
793 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
794 all_entities.push(entity);
795 *entities = Entities::from_entities(all_entities, None)
796 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
797
798 debug!("Added consumer group entity: {}", name);
799 Ok(())
800 }
801
802 pub fn add_schema(&self, name: &str, version: i64) -> CedarResult<()> {
804 let uid = EntityUid::from_type_name_and_id(
805 EntityTypeName::from_str("Rivven::Schema").unwrap(),
806 EntityId::from_str(name).unwrap(),
807 );
808
809 let mut attrs = HashMap::new();
810 attrs.insert(
811 "name".to_string(),
812 cedar_policy::RestrictedExpression::new_string(name.to_string()),
813 );
814 attrs.insert(
815 "version".to_string(),
816 cedar_policy::RestrictedExpression::new_long(version),
817 );
818
819 let entity = Entity::new(uid, attrs, HashSet::new())
820 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
821
822 let mut entities = self.entities.write();
823 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
824 all_entities.push(entity);
825 *entities = Entities::from_entities(all_entities, None)
826 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
827
828 debug!("Added schema entity: {} (version {})", name, version);
829 Ok(())
830 }
831
832 pub fn is_authorized(
834 &self,
835 principal: &str,
836 action: RivvenAction,
837 resource: &RivvenResource,
838 context: &AuthzContext,
839 ) -> CedarResult<AuthzDecision> {
840 let principal_uid = EntityUid::from_type_name_and_id(
841 EntityTypeName::from_str("Rivven::User").unwrap(),
842 EntityId::from_str(principal).unwrap(),
843 );
844
845 let action_uid = action.to_entity_uid();
846 let resource_uid = resource.to_entity_uid();
847 let cedar_context = context.to_cedar_context()?;
848
849 let request = Request::new(
850 principal_uid.clone(),
851 action_uid.clone(),
852 resource_uid.clone(),
853 cedar_context,
854 None,
855 )
856 .map_err(|e| CedarError::Request(format!("Invalid request: {:?}", e)))?;
857
858 let authorizer = Authorizer::new();
859 let policies = self.policies.read();
860 let entities = self.entities.read();
861
862 let response = authorizer.is_authorized(&request, &policies, &entities);
863
864 let decision = match response.decision() {
865 Decision::Allow => {
866 let satisfied: Vec<String> = response
867 .diagnostics()
868 .reason()
869 .map(|id| id.to_string())
870 .collect();
871 AuthzDecision::allowed(satisfied)
872 }
873 Decision::Deny => {
874 let denied: Vec<String> = response
875 .diagnostics()
876 .reason()
877 .map(|id| id.to_string())
878 .collect();
879
880 let errors: Vec<String> = response
881 .diagnostics()
882 .errors()
883 .map(|e| format!("{:?}", e))
884 .collect();
885
886 AuthzDecision {
887 allowed: false,
888 satisfied_policies: vec![],
889 denied_policies: denied,
890 errors,
891 diagnostics: None,
892 }
893 }
894 };
895
896 debug!(
897 "Authorization: {} {} {} -> {}",
898 principal,
899 action.as_str(),
900 resource.id(),
901 if decision.allowed { "ALLOW" } else { "DENY" }
902 );
903
904 Ok(decision)
905 }
906
907 pub fn authorize(
909 &self,
910 principal: &str,
911 action: RivvenAction,
912 resource: &RivvenResource,
913 context: &AuthzContext,
914 ) -> CedarResult<()> {
915 let decision = self.is_authorized(principal, action, resource, context)?;
916
917 if decision.allowed {
918 Ok(())
919 } else {
920 Err(CedarError::Denied {
921 principal: principal.to_string(),
922 action: action.as_str().to_string(),
923 resource: format!("{:?}", resource),
924 })
925 }
926 }
927
928 pub fn load_default_policies(&self) -> CedarResult<()> {
930 self.add_policy(
932 "super-admin",
933 r#"
934permit(
935 principal in Rivven::Group::"admins",
936 action,
937 resource
938);
939"#,
940 )?;
941
942 self.add_policy(
944 "describe-topics",
945 r#"
946permit(
947 principal,
948 action == Rivven::Action::"describe",
949 resource is Rivven::Topic
950);
951"#,
952 )?;
953
954 self.add_policy(
956 "describe-consumer-groups",
957 r#"
958permit(
959 principal,
960 action == Rivven::Action::"describe",
961 resource is Rivven::ConsumerGroup
962);
963"#,
964 )?;
965
966 info!("Loaded default Cedar policies");
967 Ok(())
968 }
969 }
970
971 impl Default for CedarAuthorizer {
972 fn default() -> Self {
973 Self::new().expect("Failed to create default authorizer")
974 }
975 }
976
977 #[cfg(test)]
982 mod tests {
983 use super::*;
984
985 #[test]
986 fn test_create_authorizer() {
987 let authz = CedarAuthorizer::new().unwrap();
988 assert!(authz.schema.is_some());
989 }
990
991 #[test]
992 fn test_add_simple_policy() {
993 let authz = CedarAuthorizer::new_without_schema();
994
995 authz
996 .add_policy(
997 "test-policy",
998 r#"
999permit(
1000 principal == Rivven::User::"alice",
1001 action == Rivven::Action::"produce",
1002 resource == Rivven::Topic::"orders"
1003);
1004"#,
1005 )
1006 .unwrap();
1007 }
1008
1009 #[test]
1010 fn test_add_user_and_authorize() {
1011 let authz = CedarAuthorizer::new_without_schema();
1012
1013 authz
1015 .add_policy(
1016 "admin-all",
1017 r#"
1018permit(
1019 principal in Rivven::Group::"admins",
1020 action,
1021 resource
1022);
1023"#,
1024 )
1025 .unwrap();
1026
1027 authz.add_group("admins", &[]).unwrap();
1029 authz
1030 .add_user(
1031 "alice",
1032 Some("alice@example.com"),
1033 &["admin"],
1034 &["admins"],
1035 false,
1036 )
1037 .unwrap();
1038
1039 authz
1041 .add_topic("orders", Some("alice"), 3, 2, 604800000)
1042 .unwrap();
1043
1044 let ctx = AuthzContext::new().with_ip("127.0.0.1");
1046 let decision = authz
1047 .is_authorized(
1048 "alice",
1049 RivvenAction::Produce,
1050 &RivvenResource::Topic("orders".to_string()),
1051 &ctx,
1052 )
1053 .unwrap();
1054
1055 assert!(decision.allowed);
1056 }
1057
1058 #[test]
1059 fn test_deny_unauthorized() {
1060 let authz = CedarAuthorizer::new_without_schema();
1061
1062 authz
1064 .add_policy(
1065 "only-admins-produce",
1066 r#"
1067permit(
1068 principal in Rivven::Group::"admins",
1069 action == Rivven::Action::"produce",
1070 resource is Rivven::Topic
1071);
1072"#,
1073 )
1074 .unwrap();
1075
1076 authz
1078 .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1079 .unwrap();
1080
1081 authz.add_topic("orders", None, 3, 2, 604800000).unwrap();
1083
1084 let ctx = AuthzContext::new();
1086 let decision = authz
1087 .is_authorized(
1088 "bob",
1089 RivvenAction::Produce,
1090 &RivvenResource::Topic("orders".to_string()),
1091 &ctx,
1092 )
1093 .unwrap();
1094
1095 assert!(!decision.allowed);
1096 }
1097
1098 #[test]
1099 fn test_context_attributes() {
1100 let ctx = AuthzContext::new()
1101 .with_ip("192.168.1.100")
1102 .with_tls_subject("CN=client,O=Rivven")
1103 .with_attr("custom_field", serde_json::json!("custom_value"));
1104
1105 assert_eq!(ctx.ip_address, Some("192.168.1.100".to_string()));
1106 assert_eq!(ctx.tls_subject, Some("CN=client,O=Rivven".to_string()));
1107 assert!(ctx.extra.contains_key("custom_field"));
1108 }
1109 }
1110}
1111
1112#[cfg(feature = "cedar")]
1113pub use cedar_impl::*;
1114
1115#[cfg(not(feature = "cedar"))]
1116mod no_cedar {
1117 use std::collections::HashMap;
1120 use thiserror::Error;
1121
1122 #[derive(Error, Debug)]
1123 pub enum CedarError {
1124 #[error("Cedar authorization not enabled. Build with 'cedar' feature.")]
1125 NotEnabled,
1126 }
1127
1128 pub type CedarResult<T> = Result<T, CedarError>;
1129
1130 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1131 pub enum RivvenAction {
1132 Produce,
1133 Consume,
1134 Create,
1135 Delete,
1136 Alter,
1137 Describe,
1138 Join,
1139 Leave,
1140 Commit,
1141 FetchOffsets,
1142 Admin,
1143 AlterConfigs,
1144 DescribeConfigs,
1145 }
1146
1147 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
1148 pub enum RivvenResource {
1149 Topic(String),
1150 ConsumerGroup(String),
1151 Schema(String),
1152 Cluster,
1153 }
1154
1155 #[derive(Debug, Clone, Default)]
1156 pub struct AuthzContext {
1157 pub ip_address: Option<String>,
1158 pub timestamp: String,
1159 pub tls_subject: Option<String>,
1160 pub extra: HashMap<String, serde_json::Value>,
1161 }
1162
1163 impl AuthzContext {
1164 pub fn new() -> Self {
1165 Self::default()
1166 }
1167
1168 pub fn with_ip(self, _ip: impl Into<String>) -> Self {
1169 self
1170 }
1171
1172 pub fn with_tls_subject(self, _subject: impl Into<String>) -> Self {
1173 self
1174 }
1175 }
1176
1177 pub struct CedarAuthorizer;
1178
1179 impl CedarAuthorizer {
1180 pub fn new() -> CedarResult<Self> {
1181 Err(CedarError::NotEnabled)
1182 }
1183
1184 pub fn authorize(
1185 &self,
1186 _principal: &str,
1187 _action: RivvenAction,
1188 _resource: &RivvenResource,
1189 _context: &AuthzContext,
1190 ) -> CedarResult<()> {
1191 Err(CedarError::NotEnabled)
1192 }
1193 }
1194}
1195
1196#[cfg(not(feature = "cedar"))]
1197pub use no_cedar::*;