1#[cfg(feature = "cedar")]
87mod cedar_impl {
88 use cedar_policy::{
89 Authorizer, Context, Decision, Entities, Entity, EntityId, EntityTypeName, EntityUid,
90 Policy, PolicySet, Request, Schema, ValidationMode,
91 };
92 use parking_lot::RwLock;
93 use serde::{Deserialize, Serialize};
94 use std::collections::{HashMap, HashSet};
95 use std::str::FromStr;
96 use thiserror::Error;
97 use tracing::{debug, info};
98
99 #[derive(Error, Debug)]
104 pub enum CedarError {
105 #[error("Policy parse error: {0}")]
106 PolicyParse(String),
107
108 #[error("Schema error: {0}")]
109 Schema(String),
110
111 #[error("Validation error: {0}")]
112 Validation(String),
113
114 #[error("Entity error: {0}")]
115 Entity(String),
116
117 #[error("Request error: {0}")]
118 Request(String),
119
120 #[error("Authorization denied: {principal} cannot {action} on {resource}")]
121 Denied {
122 principal: String,
123 action: String,
124 resource: String,
125 },
126
127 #[error("Internal error: {0}")]
128 Internal(String),
129 }
130
131 pub type CedarResult<T> = Result<T, CedarError>;
132
133 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
139 #[serde(rename_all = "snake_case")]
140 pub enum RivvenAction {
141 Produce,
143 Consume,
144 Create,
145 Delete,
146 Alter,
147 Describe,
148
149 Join,
151 Leave,
152 Commit,
153 FetchOffsets,
154
155 Admin,
157 AlterConfigs,
158 DescribeConfigs,
159 }
160
161 impl RivvenAction {
162 pub fn as_str(&self) -> &'static str {
163 match self {
164 Self::Produce => "produce",
165 Self::Consume => "consume",
166 Self::Create => "create",
167 Self::Delete => "delete",
168 Self::Alter => "alter",
169 Self::Describe => "describe",
170 Self::Join => "join",
171 Self::Leave => "leave",
172 Self::Commit => "commit",
173 Self::FetchOffsets => "fetch_offsets",
174 Self::Admin => "admin",
175 Self::AlterConfigs => "alter_configs",
176 Self::DescribeConfigs => "describe_configs",
177 }
178 }
179
180 fn to_entity_uid(self) -> EntityUid {
181 EntityUid::from_type_name_and_id(
182 EntityTypeName::from_str("Rivven::Action").unwrap(),
183 EntityId::from_str(self.as_str()).unwrap(),
184 )
185 }
186 }
187
188 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
194 #[serde(tag = "type", content = "name")]
195 pub enum RivvenResource {
196 Topic(String),
197 ConsumerGroup(String),
198 Schema(String),
199 Cluster,
200 }
201
202 impl RivvenResource {
203 fn type_name(&self) -> &'static str {
204 match self {
205 Self::Topic(_) => "Rivven::Topic",
206 Self::ConsumerGroup(_) => "Rivven::ConsumerGroup",
207 Self::Schema(_) => "Rivven::Schema",
208 Self::Cluster => "Rivven::Cluster",
209 }
210 }
211
212 fn id(&self) -> &str {
213 match self {
214 Self::Topic(name) => name,
215 Self::ConsumerGroup(name) => name,
216 Self::Schema(name) => name,
217 Self::Cluster => "default",
218 }
219 }
220
221 fn to_entity_uid(&self) -> EntityUid {
222 EntityUid::from_type_name_and_id(
223 EntityTypeName::from_str(self.type_name()).unwrap(),
224 EntityId::from_str(self.id()).unwrap(),
225 )
226 }
227 }
228
229 #[derive(Debug, Clone, Serialize, Deserialize)]
235 pub struct AuthzContext {
236 pub ip_address: Option<String>,
238
239 pub timestamp: String,
241
242 pub tls_subject: Option<String>,
244
245 #[serde(flatten)]
247 pub extra: HashMap<String, serde_json::Value>,
248 }
249
250 impl Default for AuthzContext {
251 fn default() -> Self {
252 Self {
253 ip_address: None,
254 timestamp: chrono::Utc::now().to_rfc3339(),
255 tls_subject: None,
256 extra: HashMap::new(),
257 }
258 }
259 }
260
261 impl AuthzContext {
262 pub fn new() -> Self {
263 Self::default()
264 }
265
266 pub fn with_ip(mut self, ip: impl Into<String>) -> Self {
267 self.ip_address = Some(ip.into());
268 self
269 }
270
271 pub fn with_tls_subject(mut self, subject: impl Into<String>) -> Self {
272 self.tls_subject = Some(subject.into());
273 self
274 }
275
276 pub fn with_attr(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
277 self.extra.insert(key.into(), value);
278 self
279 }
280
281 fn to_cedar_context(&self) -> CedarResult<Context> {
282 let mut context_map = serde_json::Map::new();
285
286 context_map.insert("timestamp".to_string(), serde_json::json!(self.timestamp));
288
289 if let Some(ip) = &self.ip_address {
291 context_map.insert("ip_address".to_string(), serde_json::json!(ip));
292 }
293
294 if let Some(tls) = &self.tls_subject {
295 context_map.insert("tls_subject".to_string(), serde_json::json!(tls));
296 }
297
298 for (key, value) in &self.extra {
300 if !value.is_null() {
301 context_map.insert(key.clone(), value.clone());
302 }
303 }
304
305 let json = serde_json::Value::Object(context_map);
306
307 Context::from_json_value(json, None)
308 .map_err(|e| CedarError::Request(format!("Invalid context: {}", e)))
309 }
310 }
311
312 #[derive(Debug, Clone)]
318 pub struct AuthzDecision {
319 pub allowed: bool,
321
322 pub satisfied_policies: Vec<String>,
324
325 pub denied_policies: Vec<String>,
327
328 pub errors: Vec<String>,
330
331 pub diagnostics: Option<String>,
333 }
334
335 impl AuthzDecision {
336 pub fn allowed(satisfied: Vec<String>) -> Self {
337 Self {
338 allowed: true,
339 satisfied_policies: satisfied,
340 denied_policies: vec![],
341 errors: vec![],
342 diagnostics: None,
343 }
344 }
345
346 pub fn denied(denied: Vec<String>) -> Self {
347 Self {
348 allowed: false,
349 satisfied_policies: vec![],
350 denied_policies: denied,
351 errors: vec![],
352 diagnostics: None,
353 }
354 }
355 }
356
357 pub struct CedarAuthorizer {
363 schema: Option<Schema>,
365
366 policies: RwLock<PolicySet>,
368
369 entities: RwLock<Entities>,
371
372 validate_policies: bool,
374 }
375
376 impl CedarAuthorizer {
377 pub fn new() -> CedarResult<Self> {
379 let schema = Self::default_schema()?;
380
381 Ok(Self {
382 schema: Some(schema),
383 policies: RwLock::new(PolicySet::new()),
384 entities: RwLock::new(Entities::empty()),
385 validate_policies: true,
386 })
387 }
388
389 pub fn new_without_schema() -> Self {
391 Self {
392 schema: None,
393 policies: RwLock::new(PolicySet::new()),
394 entities: RwLock::new(Entities::empty()),
395 validate_policies: false,
396 }
397 }
398
399 fn default_schema() -> CedarResult<Schema> {
401 let schema_src = r#"
402{
403 "Rivven": {
404 "entityTypes": {
405 "User": {
406 "memberOfTypes": ["Group"],
407 "shape": {
408 "type": "Record",
409 "attributes": {
410 "email": { "type": "String", "required": false },
411 "roles": { "type": "Set", "element": { "type": "String" } },
412 "service_account": { "type": "Boolean" }
413 }
414 }
415 },
416 "Group": {
417 "memberOfTypes": ["Group"]
418 },
419 "Topic": {
420 "shape": {
421 "type": "Record",
422 "attributes": {
423 "owner": { "type": "Entity", "name": "Rivven::User", "required": false },
424 "partitions": { "type": "Long" },
425 "replication_factor": { "type": "Long" },
426 "retention_ms": { "type": "Long" },
427 "name": { "type": "String" }
428 }
429 }
430 },
431 "ConsumerGroup": {
432 "shape": {
433 "type": "Record",
434 "attributes": {
435 "name": { "type": "String" }
436 }
437 }
438 },
439 "Schema": {
440 "shape": {
441 "type": "Record",
442 "attributes": {
443 "name": { "type": "String" },
444 "version": { "type": "Long" }
445 }
446 }
447 },
448 "Cluster": {}
449 },
450 "actions": {
451 "produce": {
452 "appliesTo": {
453 "principalTypes": ["User", "Group"],
454 "resourceTypes": ["Topic"]
455 }
456 },
457 "consume": {
458 "appliesTo": {
459 "principalTypes": ["User", "Group"],
460 "resourceTypes": ["Topic"]
461 }
462 },
463 "create": {
464 "appliesTo": {
465 "principalTypes": ["User", "Group"],
466 "resourceTypes": ["Topic", "Schema"]
467 }
468 },
469 "delete": {
470 "appliesTo": {
471 "principalTypes": ["User", "Group"],
472 "resourceTypes": ["Topic", "ConsumerGroup", "Schema"]
473 }
474 },
475 "alter": {
476 "appliesTo": {
477 "principalTypes": ["User", "Group"],
478 "resourceTypes": ["Topic", "Schema"]
479 }
480 },
481 "describe": {
482 "appliesTo": {
483 "principalTypes": ["User", "Group"],
484 "resourceTypes": ["Topic", "ConsumerGroup", "Schema", "Cluster"]
485 }
486 },
487 "join": {
488 "appliesTo": {
489 "principalTypes": ["User", "Group"],
490 "resourceTypes": ["ConsumerGroup"]
491 }
492 },
493 "leave": {
494 "appliesTo": {
495 "principalTypes": ["User", "Group"],
496 "resourceTypes": ["ConsumerGroup"]
497 }
498 },
499 "commit": {
500 "appliesTo": {
501 "principalTypes": ["User", "Group"],
502 "resourceTypes": ["ConsumerGroup"]
503 }
504 },
505 "fetch_offsets": {
506 "appliesTo": {
507 "principalTypes": ["User", "Group"],
508 "resourceTypes": ["ConsumerGroup"]
509 }
510 },
511 "admin": {
512 "appliesTo": {
513 "principalTypes": ["User", "Group"],
514 "resourceTypes": ["Cluster"]
515 }
516 },
517 "alter_configs": {
518 "appliesTo": {
519 "principalTypes": ["User", "Group"],
520 "resourceTypes": ["Cluster"]
521 }
522 },
523 "describe_configs": {
524 "appliesTo": {
525 "principalTypes": ["User", "Group"],
526 "resourceTypes": ["Cluster"]
527 }
528 }
529 }
530 }
531}
532"#;
533
534 Schema::from_json_str(schema_src)
535 .map_err(|e| CedarError::Schema(format!("Invalid schema: {:?}", e)))
536 }
537
538 pub fn add_policy(&self, id: &str, policy_src: &str) -> CedarResult<()> {
540 let policy = Policy::parse(Some(cedar_policy::PolicyId::new(id)), policy_src)
541 .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
542
543 if self.validate_policies {
545 if let Some(schema) = &self.schema {
546 let mut temp_set = PolicySet::new();
547 temp_set.add(policy.clone()).map_err(|e| {
548 CedarError::PolicyParse(format!("Duplicate policy ID: {:?}", e))
549 })?;
550
551 let validator = cedar_policy::Validator::new(schema.clone());
552 let result = validator.validate(&temp_set, ValidationMode::Strict);
553
554 if !result.validation_passed() {
555 let errors: Vec<String> = result
556 .validation_errors()
557 .map(|e| format!("{:?}", e))
558 .collect();
559 return Err(CedarError::Validation(errors.join("; ")));
560 }
561 }
562 }
563
564 let mut policies = self.policies.write();
565 policies
566 .add(policy)
567 .map_err(|e| CedarError::PolicyParse(format!("Failed to add policy: {:?}", e)))?;
568
569 info!("Added Cedar policy: {}", id);
570 Ok(())
571 }
572
573 pub fn add_policies(&self, policies_src: &str) -> CedarResult<()> {
575 let parsed = PolicySet::from_str(policies_src)
576 .map_err(|e| CedarError::PolicyParse(format!("{:?}", e)))?;
577
578 if self.validate_policies {
580 if let Some(schema) = &self.schema {
581 let validator = cedar_policy::Validator::new(schema.clone());
582 let result = validator.validate(&parsed, ValidationMode::Strict);
583
584 if !result.validation_passed() {
585 let errors: Vec<String> = result
586 .validation_errors()
587 .map(|e| format!("{:?}", e))
588 .collect();
589 return Err(CedarError::Validation(errors.join("; ")));
590 }
591 }
592 }
593
594 let mut policies = self.policies.write();
595 for policy in parsed.policies() {
596 policies.add(policy.clone()).map_err(|e| {
597 CedarError::PolicyParse(format!("Failed to add policy: {:?}", e))
598 })?;
599 }
600
601 Ok(())
602 }
603
604 pub fn remove_policy(&self, id: &str) -> CedarResult<()> {
606 let mut policies = self.policies.write();
609 let policy_id = cedar_policy::PolicyId::new(id);
610
611 let new_policies = policies
613 .policies()
614 .filter(|p| p.id() != &policy_id)
615 .cloned()
616 .collect::<Vec<_>>();
617
618 let mut new_set = PolicySet::new();
619 for policy in new_policies {
620 new_set.add(policy).ok();
621 }
622
623 *policies = new_set;
624 info!("Removed Cedar policy: {}", id);
625 Ok(())
626 }
627
628 pub fn add_user(
630 &self,
631 username: &str,
632 email: Option<&str>,
633 roles: &[&str],
634 groups: &[&str],
635 is_service_account: bool,
636 ) -> CedarResult<()> {
637 let uid = EntityUid::from_type_name_and_id(
638 EntityTypeName::from_str("Rivven::User").unwrap(),
639 EntityId::from_str(username).unwrap(),
640 );
641
642 let parents: HashSet<EntityUid> = groups
644 .iter()
645 .map(|g| {
646 EntityUid::from_type_name_and_id(
647 EntityTypeName::from_str("Rivven::Group").unwrap(),
648 EntityId::from_str(g).unwrap(),
649 )
650 })
651 .collect();
652
653 let mut attrs = HashMap::new();
655 if let Some(e) = email {
656 attrs.insert(
657 "email".to_string(),
658 cedar_policy::RestrictedExpression::new_string(e.to_string()),
659 );
660 }
661
662 let roles_set: Vec<_> = roles
663 .iter()
664 .map(|r| cedar_policy::RestrictedExpression::new_string(r.to_string()))
665 .collect();
666 attrs.insert(
667 "roles".to_string(),
668 cedar_policy::RestrictedExpression::new_set(roles_set),
669 );
670
671 attrs.insert(
672 "service_account".to_string(),
673 cedar_policy::RestrictedExpression::new_bool(is_service_account),
674 );
675
676 let entity = Entity::new(uid, attrs, parents)
677 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
678
679 let mut entities = self.entities.write();
680 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
682 all_entities.push(entity);
683 *entities = Entities::from_entities(all_entities, None)
684 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
685
686 debug!("Added user entity: {}", username);
687 Ok(())
688 }
689
690 pub fn add_group(&self, name: &str, parent_groups: &[&str]) -> CedarResult<()> {
692 let uid = EntityUid::from_type_name_and_id(
693 EntityTypeName::from_str("Rivven::Group").unwrap(),
694 EntityId::from_str(name).unwrap(),
695 );
696
697 let parents: HashSet<EntityUid> = parent_groups
698 .iter()
699 .map(|g| {
700 EntityUid::from_type_name_and_id(
701 EntityTypeName::from_str("Rivven::Group").unwrap(),
702 EntityId::from_str(g).unwrap(),
703 )
704 })
705 .collect();
706
707 let entity = Entity::new_no_attrs(uid, parents);
708
709 let mut entities = self.entities.write();
710 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
711 all_entities.push(entity);
712 *entities = Entities::from_entities(all_entities, None)
713 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
714
715 debug!("Added group entity: {}", name);
716 Ok(())
717 }
718
719 pub fn add_topic(
721 &self,
722 name: &str,
723 owner: Option<&str>,
724 partitions: i64,
725 replication_factor: i64,
726 retention_ms: i64,
727 ) -> CedarResult<()> {
728 let uid = EntityUid::from_type_name_and_id(
729 EntityTypeName::from_str("Rivven::Topic").unwrap(),
730 EntityId::from_str(name).unwrap(),
731 );
732
733 let mut attrs = HashMap::new();
734 attrs.insert(
735 "name".to_string(),
736 cedar_policy::RestrictedExpression::new_string(name.to_string()),
737 );
738
739 if let Some(o) = owner {
740 let owner_uid = EntityUid::from_type_name_and_id(
741 EntityTypeName::from_str("Rivven::User").unwrap(),
742 EntityId::from_str(o).unwrap(),
743 );
744 attrs.insert(
745 "owner".to_string(),
746 cedar_policy::RestrictedExpression::new_entity_uid(owner_uid),
747 );
748 }
749
750 attrs.insert(
751 "partitions".to_string(),
752 cedar_policy::RestrictedExpression::new_long(partitions),
753 );
754 attrs.insert(
755 "replication_factor".to_string(),
756 cedar_policy::RestrictedExpression::new_long(replication_factor),
757 );
758 attrs.insert(
759 "retention_ms".to_string(),
760 cedar_policy::RestrictedExpression::new_long(retention_ms),
761 );
762
763 let entity = Entity::new(uid, attrs, HashSet::new())
764 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
765
766 let mut entities = self.entities.write();
767 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
768 all_entities.push(entity);
769 *entities = Entities::from_entities(all_entities, None)
770 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
771
772 debug!("Added topic entity: {}", name);
773 Ok(())
774 }
775
776 pub fn add_consumer_group(&self, name: &str) -> CedarResult<()> {
778 let uid = EntityUid::from_type_name_and_id(
779 EntityTypeName::from_str("Rivven::ConsumerGroup").unwrap(),
780 EntityId::from_str(name).unwrap(),
781 );
782
783 let mut attrs = HashMap::new();
784 attrs.insert(
785 "name".to_string(),
786 cedar_policy::RestrictedExpression::new_string(name.to_string()),
787 );
788
789 let entity = Entity::new(uid, attrs, HashSet::new())
790 .map_err(|e| CedarError::Entity(format!("Invalid entity: {:?}", e)))?;
791
792 let mut entities = self.entities.write();
793 let mut all_entities: Vec<Entity> = entities.iter().cloned().collect();
794 all_entities.push(entity);
795 *entities = Entities::from_entities(all_entities, None)
796 .map_err(|e| CedarError::Entity(format!("Invalid entities: {:?}", e)))?;
797
798 debug!("Added consumer group entity: {}", name);
799 Ok(())
800 }
801
802 pub fn is_authorized(
804 &self,
805 principal: &str,
806 action: RivvenAction,
807 resource: &RivvenResource,
808 context: &AuthzContext,
809 ) -> CedarResult<AuthzDecision> {
810 let principal_uid = EntityUid::from_type_name_and_id(
811 EntityTypeName::from_str("Rivven::User").unwrap(),
812 EntityId::from_str(principal).unwrap(),
813 );
814
815 let action_uid = action.to_entity_uid();
816 let resource_uid = resource.to_entity_uid();
817 let cedar_context = context.to_cedar_context()?;
818
819 let request = Request::new(
820 principal_uid.clone(),
821 action_uid.clone(),
822 resource_uid.clone(),
823 cedar_context,
824 None,
825 )
826 .map_err(|e| CedarError::Request(format!("Invalid request: {:?}", e)))?;
827
828 let authorizer = Authorizer::new();
829 let policies = self.policies.read();
830 let entities = self.entities.read();
831
832 let response = authorizer.is_authorized(&request, &policies, &entities);
833
834 let decision = match response.decision() {
835 Decision::Allow => {
836 let satisfied: Vec<String> = response
837 .diagnostics()
838 .reason()
839 .map(|id| id.to_string())
840 .collect();
841 AuthzDecision::allowed(satisfied)
842 }
843 Decision::Deny => {
844 let denied: Vec<String> = response
845 .diagnostics()
846 .reason()
847 .map(|id| id.to_string())
848 .collect();
849
850 let errors: Vec<String> = response
851 .diagnostics()
852 .errors()
853 .map(|e| format!("{:?}", e))
854 .collect();
855
856 AuthzDecision {
857 allowed: false,
858 satisfied_policies: vec![],
859 denied_policies: denied,
860 errors,
861 diagnostics: None,
862 }
863 }
864 };
865
866 debug!(
867 "Authorization: {} {} {} -> {}",
868 principal,
869 action.as_str(),
870 resource.id(),
871 if decision.allowed { "ALLOW" } else { "DENY" }
872 );
873
874 Ok(decision)
875 }
876
877 pub fn authorize(
879 &self,
880 principal: &str,
881 action: RivvenAction,
882 resource: &RivvenResource,
883 context: &AuthzContext,
884 ) -> CedarResult<()> {
885 let decision = self.is_authorized(principal, action, resource, context)?;
886
887 if decision.allowed {
888 Ok(())
889 } else {
890 Err(CedarError::Denied {
891 principal: principal.to_string(),
892 action: action.as_str().to_string(),
893 resource: format!("{:?}", resource),
894 })
895 }
896 }
897
898 pub fn load_default_policies(&self) -> CedarResult<()> {
900 self.add_policy(
902 "super-admin",
903 r#"
904permit(
905 principal in Rivven::Group::"admins",
906 action,
907 resource
908);
909"#,
910 )?;
911
912 self.add_policy(
914 "describe-topics",
915 r#"
916permit(
917 principal,
918 action == Rivven::Action::"describe",
919 resource is Rivven::Topic
920);
921"#,
922 )?;
923
924 self.add_policy(
926 "describe-consumer-groups",
927 r#"
928permit(
929 principal,
930 action == Rivven::Action::"describe",
931 resource is Rivven::ConsumerGroup
932);
933"#,
934 )?;
935
936 info!("Loaded default Cedar policies");
937 Ok(())
938 }
939 }
940
941 impl Default for CedarAuthorizer {
942 fn default() -> Self {
943 Self::new().expect("Failed to create default authorizer")
944 }
945 }
946
947 #[cfg(test)]
952 mod tests {
953 use super::*;
954
955 #[test]
956 fn test_create_authorizer() {
957 let authz = CedarAuthorizer::new().unwrap();
958 assert!(authz.schema.is_some());
959 }
960
961 #[test]
962 fn test_add_simple_policy() {
963 let authz = CedarAuthorizer::new_without_schema();
964
965 authz
966 .add_policy(
967 "test-policy",
968 r#"
969permit(
970 principal == Rivven::User::"alice",
971 action == Rivven::Action::"produce",
972 resource == Rivven::Topic::"orders"
973);
974"#,
975 )
976 .unwrap();
977 }
978
979 #[test]
980 fn test_add_user_and_authorize() {
981 let authz = CedarAuthorizer::new_without_schema();
982
983 authz
985 .add_policy(
986 "admin-all",
987 r#"
988permit(
989 principal in Rivven::Group::"admins",
990 action,
991 resource
992);
993"#,
994 )
995 .unwrap();
996
997 authz.add_group("admins", &[]).unwrap();
999 authz
1000 .add_user(
1001 "alice",
1002 Some("alice@example.com"),
1003 &["admin"],
1004 &["admins"],
1005 false,
1006 )
1007 .unwrap();
1008
1009 authz
1011 .add_topic("orders", Some("alice"), 3, 2, 604800000)
1012 .unwrap();
1013
1014 let ctx = AuthzContext::new().with_ip("127.0.0.1");
1016 let decision = authz
1017 .is_authorized(
1018 "alice",
1019 RivvenAction::Produce,
1020 &RivvenResource::Topic("orders".to_string()),
1021 &ctx,
1022 )
1023 .unwrap();
1024
1025 assert!(decision.allowed);
1026 }
1027
1028 #[test]
1029 fn test_deny_unauthorized() {
1030 let authz = CedarAuthorizer::new_without_schema();
1031
1032 authz
1034 .add_policy(
1035 "only-admins-produce",
1036 r#"
1037permit(
1038 principal in Rivven::Group::"admins",
1039 action == Rivven::Action::"produce",
1040 resource is Rivven::Topic
1041);
1042"#,
1043 )
1044 .unwrap();
1045
1046 authz
1048 .add_user("bob", Some("bob@example.com"), &["user"], &[], false)
1049 .unwrap();
1050
1051 authz.add_topic("orders", None, 3, 2, 604800000).unwrap();
1053
1054 let ctx = AuthzContext::new();
1056 let decision = authz
1057 .is_authorized(
1058 "bob",
1059 RivvenAction::Produce,
1060 &RivvenResource::Topic("orders".to_string()),
1061 &ctx,
1062 )
1063 .unwrap();
1064
1065 assert!(!decision.allowed);
1066 }
1067
1068 #[test]
1069 fn test_context_attributes() {
1070 let ctx = AuthzContext::new()
1071 .with_ip("192.168.1.100")
1072 .with_tls_subject("CN=client,O=Rivven")
1073 .with_attr("custom_field", serde_json::json!("custom_value"));
1074
1075 assert_eq!(ctx.ip_address, Some("192.168.1.100".to_string()));
1076 assert_eq!(ctx.tls_subject, Some("CN=client,O=Rivven".to_string()));
1077 assert!(ctx.extra.contains_key("custom_field"));
1078 }
1079 }
1080}
1081
1082#[cfg(feature = "cedar")]
1083pub use cedar_impl::*;
1084
1085#[cfg(not(feature = "cedar"))]
1086mod no_cedar {
1087 use std::collections::HashMap;
1090 use thiserror::Error;
1091
1092 #[derive(Error, Debug)]
1093 pub enum CedarError {
1094 #[error("Cedar authorization not enabled. Build with 'cedar' feature.")]
1095 NotEnabled,
1096 }
1097
1098 pub type CedarResult<T> = Result<T, CedarError>;
1099
1100 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1101 pub enum RivvenAction {
1102 Produce,
1103 Consume,
1104 Create,
1105 Delete,
1106 Alter,
1107 Describe,
1108 Join,
1109 Leave,
1110 Commit,
1111 FetchOffsets,
1112 Admin,
1113 AlterConfigs,
1114 DescribeConfigs,
1115 }
1116
1117 #[derive(Debug, Clone, PartialEq, Eq, Hash)]
1118 pub enum RivvenResource {
1119 Topic(String),
1120 ConsumerGroup(String),
1121 Schema(String),
1122 Cluster,
1123 }
1124
1125 #[derive(Debug, Clone, Default)]
1126 pub struct AuthzContext {
1127 pub ip_address: Option<String>,
1128 pub timestamp: String,
1129 pub tls_subject: Option<String>,
1130 pub extra: HashMap<String, serde_json::Value>,
1131 }
1132
1133 impl AuthzContext {
1134 pub fn new() -> Self {
1135 Self::default()
1136 }
1137
1138 pub fn with_ip(self, _ip: impl Into<String>) -> Self {
1139 self
1140 }
1141
1142 pub fn with_tls_subject(self, _subject: impl Into<String>) -> Self {
1143 self
1144 }
1145 }
1146
1147 pub struct CedarAuthorizer;
1148
1149 impl CedarAuthorizer {
1150 pub fn new() -> CedarResult<Self> {
1151 Err(CedarError::NotEnabled)
1152 }
1153
1154 pub fn authorize(
1155 &self,
1156 _principal: &str,
1157 _action: RivvenAction,
1158 _resource: &RivvenResource,
1159 _context: &AuthzContext,
1160 ) -> CedarResult<()> {
1161 Err(CedarError::NotEnabled)
1162 }
1163 }
1164}
1165
1166#[cfg(not(feature = "cedar"))]
1167pub use no_cedar::*;