Skip to main content

crabka_authz/
cache.rs

1//! Gateway-side ACL snapshot: a flat `Vec<AclEntry>` (from `describe_acls`)
2//! that implements [`AclSource`] with EXACTLY the broker's matching semantics.
3
4use crabka_metadata::{AclEntry, PatternType, ResourceType};
5
6use crate::AclSource;
7
8/// Immutable ACL snapshot; rebuilt wholesale on each refresh.
9#[derive(Debug, Clone, Default)]
10pub struct AclCache {
11    entries: Vec<AclEntry>,
12}
13
14impl AclCache {
15    #[must_use]
16    pub fn new(entries: Vec<AclEntry>) -> Self {
17        Self { entries }
18    }
19    #[must_use]
20    pub fn len(&self) -> usize {
21        self.entries.len()
22    }
23    #[must_use]
24    pub fn is_empty(&self) -> bool {
25        self.entries.is_empty()
26    }
27}
28
29impl AclSource for AclCache {
30    fn matching_acls<'a>(
31        &'a self,
32        rt: ResourceType,
33        name: &'a str,
34    ) -> Box<dyn Iterator<Item = &'a AclEntry> + 'a> {
35        // MUST mirror MetadataImage::matching_acls: same resource_type, and
36        // (LITERAL == name) || (LITERAL == "*") || (PREFIXED && name.starts_with(resource_name)).
37        Box::new(self.entries.iter().filter(move |e| {
38            e.resource_type == rt
39                && match e.pattern_type {
40                    PatternType::Literal => e.resource_name == name || e.resource_name == "*",
41                    PatternType::Prefixed => name.starts_with(e.resource_name.as_str()),
42                }
43        }))
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50    use assert2::assert;
51    use crabka_metadata::{
52        AclOperation, MetadataImage, MetadataRecord, PermissionType, ResourceType,
53    };
54    use uuid::Uuid;
55
56    fn entry(rt: ResourceType, pattern: PatternType, name: &str, op: AclOperation) -> AclEntry {
57        AclEntry {
58            resource_type: rt,
59            resource_name: name.into(),
60            pattern_type: pattern,
61            principal: "User:alice".into(),
62            host: "*".into(),
63            operation: op,
64            permission_type: PermissionType::Allow,
65        }
66    }
67
68    /// A stable, comparable identity for an `AclEntry` so the two sources'
69    /// `matching_acls` outputs can be compared as sets regardless of order.
70    /// (The ACL enums derive `Debug` but not `Ord`, so we key on the debug
71    /// rendering of the identifying fields — a `String`, which is `Ord`.)
72    fn key(e: &AclEntry) -> String {
73        format!(
74            "{:?}|{:?}|{}|{:?}",
75            e.resource_type, e.pattern_type, e.resource_name, e.operation
76        )
77    }
78
79    fn sorted_keys<'a>(it: Box<dyn Iterator<Item = &'a AclEntry> + 'a>) -> Vec<String> {
80        let mut v: Vec<_> = it.map(key).collect();
81        v.sort();
82        v
83    }
84
85    /// Cross-validation guard: the same `AclEntry` set, built into BOTH a
86    /// `MetadataImage` (via `apply`) and an `AclCache`, must yield the SAME
87    /// matching set for every probe. This protects against drift between the
88    /// broker's image matching and the gateway cache's reimplementation.
89    #[test]
90    fn cache_matches_image_for_every_probe() {
91        // A representative ACL set: literal exact, the literal "*" wildcard,
92        // a prefixed entry, an unrelated topic, and a different resource type.
93        let entries = vec![
94            entry(
95                ResourceType::Topic,
96                PatternType::Literal,
97                "foo",
98                AclOperation::Read,
99            ),
100            entry(
101                ResourceType::Topic,
102                PatternType::Literal,
103                "*",
104                AclOperation::Write,
105            ),
106            entry(
107                ResourceType::Topic,
108                PatternType::Prefixed,
109                "team-",
110                AclOperation::Read,
111            ),
112            entry(
113                ResourceType::Topic,
114                PatternType::Literal,
115                "bar",
116                AclOperation::Read,
117            ),
118            entry(
119                ResourceType::Group,
120                PatternType::Literal,
121                "cg-1",
122                AclOperation::Read,
123            ),
124            entry(
125                ResourceType::Group,
126                PatternType::Prefixed,
127                "app-",
128                AclOperation::Read,
129            ),
130        ];
131
132        // Build the same set into a MetadataImage (broker side) ...
133        let mut image = MetadataImage::new(Uuid::nil());
134        for e in &entries {
135            image.apply(&MetadataRecord::V1AccessControlEntry(e.clone()));
136        }
137        // ... and into an AclCache (gateway side).
138        let cache = AclCache::new(entries.clone());
139
140        // Probes covering every matching code path.
141        let probes: &[(ResourceType, &str)] = &[
142            (ResourceType::Topic, "foo"),      // literal exact hit (+ "*" wildcard)
143            (ResourceType::Topic, "*"),        // querying the wildcard resource itself
144            (ResourceType::Topic, "team-foo"), // prefixed hit (+ "*" wildcard)
145            (ResourceType::Topic, "team-"),    // prefix boundary (starts_with itself)
146            (ResourceType::Topic, "nomatch"),  // only the "*" wildcard matches
147            (ResourceType::Group, "cg-1"),     // literal hit, no wildcard for Group
148            (ResourceType::Group, "app-svc"),  // prefixed hit on Group
149            (ResourceType::Group, "other"),    // prefixed miss, no wildcard → empty
150            (ResourceType::Cluster, "kafka-cluster"), // wrong type → empty in both
151        ];
152
153        for &(rt, name) in probes {
154            let from_image = sorted_keys(AclSource::matching_acls(&image, rt, name));
155            let from_cache = sorted_keys(AclSource::matching_acls(&cache, rt, name));
156            assert!(
157                from_image == from_cache,
158                "drift at ({rt:?}, {name:?}): image={from_image:?} cache={from_cache:?}"
159            );
160        }
161    }
162
163    #[test]
164    fn len_and_is_empty_track_entries() {
165        let empty = AclCache::default();
166        assert!(empty.is_empty());
167        assert!(empty.len() == 0);
168
169        let cache = AclCache::new(vec![entry(
170            ResourceType::Topic,
171            PatternType::Literal,
172            "foo",
173            AclOperation::Read,
174        )]);
175        assert!(!cache.is_empty());
176        assert!(cache.len() == 1);
177    }
178}