Skip to main content

crabka_metadata/
acl.rs

1//! Wire-stable ACL types replicated through the raft quorum
2//! via `MetadataRecord::V1AccessControlEntry` /
3//! `V1DeleteAccessControlEntry`. Mirrors the shape Kafka exposes on the
4//! `CreateAcls` / `DeleteAcls` / `DescribeAcls` wire messages, but stays
5//! pure data — the authorizer in `crabka-broker` evaluates these.
6
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
10pub enum ResourceType {
11    Topic,
12    Group,
13    Cluster,
14    TransactionalId,
15    /// KIP-48 delegation tokens. Resource name is the owner's
16    /// `KafkaPrincipal` string form (e.g. `"User:alice"`). Pattern types
17    /// `LITERAL`/`PREFIXED` apply via the existing matcher; only the
18    /// `Describe` operation is externally grantable
19    /// (`Create`/`Renew`/`Expire` are implicit on ownership).
20    DelegationToken,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub enum PatternType {
25    Literal,
26    Prefixed,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub enum PermissionType {
31    Allow,
32    Deny,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
36pub enum AclOperation {
37    All,
38    Read,
39    Write,
40    Create,
41    Delete,
42    Alter,
43    Describe,
44    ClusterAction,
45    DescribeConfigs,
46    AlterConfigs,
47    IdempotentWrite,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct AclEntry {
52    pub resource_type: ResourceType,
53    pub resource_name: String,
54    pub pattern_type: PatternType,
55    pub principal: String,
56    pub host: String,
57    pub operation: AclOperation,
58    pub permission_type: PermissionType,
59}
60
61#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
62pub struct AclEntryFilter {
63    pub resource_type: Option<ResourceType>,
64    pub resource_name: Option<String>,
65    pub pattern_type: Option<PatternType>,
66    pub principal: Option<String>,
67    pub host: Option<String>,
68    pub operation: Option<AclOperation>,
69    pub permission_type: Option<PermissionType>,
70}
71
72impl AclEntryFilter {
73    /// Returns true if every populated axis matches `entry`. `None` axes
74    /// match anything.
75    #[must_use]
76    pub fn matches(&self, entry: &AclEntry) -> bool {
77        self.resource_type
78            .is_none_or(|rt| rt == entry.resource_type)
79            && self
80                .resource_name
81                .as_ref()
82                .is_none_or(|rn| rn == &entry.resource_name)
83            && self.pattern_type.is_none_or(|pt| pt == entry.pattern_type)
84            && self
85                .principal
86                .as_ref()
87                .is_none_or(|p| p == &entry.principal)
88            && self.host.as_ref().is_none_or(|h| h == &entry.host)
89            && self.operation.is_none_or(|op| op == entry.operation)
90            && self
91                .permission_type
92                .is_none_or(|pt| pt == entry.permission_type)
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use assert2::assert;
100    use serde_wincode::SerdeCompat;
101    use wincode::{Deserialize as _, Serialize as _};
102
103    fn rt<T>(value: &T) -> T
104    where
105        T: Serialize + for<'de> Deserialize<'de> + PartialEq + std::fmt::Debug,
106    {
107        let bytes = <SerdeCompat<T>>::serialize(value).unwrap();
108        <SerdeCompat<T>>::deserialize(&bytes).unwrap()
109    }
110
111    #[test]
112    fn acl_entry_round_trip() {
113        let entry = AclEntry {
114            resource_type: ResourceType::Topic,
115            resource_name: "foo".into(),
116            pattern_type: PatternType::Literal,
117            principal: "User:alice".into(),
118            host: "*".into(),
119            operation: AclOperation::Read,
120            permission_type: PermissionType::Allow,
121        };
122        assert!(rt(&entry) == entry);
123    }
124
125    #[test]
126    fn acl_entry_filter_round_trip() {
127        let filter = AclEntryFilter {
128            resource_type: Some(ResourceType::Group),
129            resource_name: None,
130            pattern_type: Some(PatternType::Prefixed),
131            principal: Some("User:bob".into()),
132            host: None,
133            operation: Some(AclOperation::All),
134            permission_type: None,
135        };
136        assert!(rt(&filter) == filter);
137    }
138
139    #[test]
140    fn filter_with_all_none_matches_anything() {
141        let f = AclEntryFilter::default();
142        let entry = AclEntry {
143            resource_type: ResourceType::Cluster,
144            resource_name: "kafka-cluster".into(),
145            pattern_type: PatternType::Literal,
146            principal: "User:admin".into(),
147            host: "*".into(),
148            operation: AclOperation::All,
149            permission_type: PermissionType::Allow,
150        };
151        assert!(f.matches(&entry));
152    }
153
154    #[test]
155    fn filter_with_specific_axis_filters_correctly() {
156        let f = AclEntryFilter {
157            resource_type: Some(ResourceType::Topic),
158            ..AclEntryFilter::default()
159        };
160        let topic_entry = AclEntry {
161            resource_type: ResourceType::Topic,
162            resource_name: "foo".into(),
163            pattern_type: PatternType::Literal,
164            principal: "User:alice".into(),
165            host: "*".into(),
166            operation: AclOperation::Read,
167            permission_type: PermissionType::Allow,
168        };
169        let group_entry = AclEntry {
170            resource_type: ResourceType::Group,
171            ..topic_entry.clone()
172        };
173        assert!(f.matches(&topic_entry));
174        assert!(!f.matches(&group_entry));
175    }
176}