Skip to main content

kevy_scope/
ownership.rs

1//! `OwnershipTable` — the immutable startup-built lookup structure.
2//! Scopes are sorted by prefix length descending so the first match
3//! IS the longest-prefix match (T3.7). Overlap is rejected at
4//! construction time (T3.6).
5
6use crate::{Routing, Scope};
7
8/// Reasons [`OwnershipTable::new`] can refuse a list of scopes.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum OwnershipError {
11    /// Two scopes share an identical prefix — ambiguous ownership.
12    DuplicatePrefix {
13        /// The prefix bytes (lossy UTF-8 in the formatted message).
14        prefix: Vec<u8>,
15    },
16    /// One scope's prefix is a *strict* prefix of another's
17    /// (e.g. `app:` and `app:billing:`). Even with longest-prefix
18    /// match working at runtime, having both declared is almost
19    /// always an operator mistake — the broader scope's writer
20    /// would never see writes for the inner scope's keys. The
21    /// linter rejects loudly rather than silently masking. To
22    /// genuinely want a base + inner scope, use a single declaration
23    /// at the innermost level.
24    OverlappingPrefix {
25        /// The longer (more specific) prefix in the conflict.
26        inner: Vec<u8>,
27        /// The shorter (broader) prefix that swallows the inner.
28        outer: Vec<u8>,
29    },
30}
31
32impl std::fmt::Display for OwnershipError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            Self::DuplicatePrefix { prefix } => {
36                write!(
37                    f,
38                    "duplicate scope prefix {:?}",
39                    String::from_utf8_lossy(prefix)
40                )
41            }
42            Self::OverlappingPrefix { inner, outer } => {
43                write!(
44                    f,
45                    "overlapping scope prefixes: {:?} is contained by {:?}",
46                    String::from_utf8_lossy(inner),
47                    String::from_utf8_lossy(outer),
48                )
49            }
50        }
51    }
52}
53
54impl std::error::Error for OwnershipError {}
55
56/// Immutable ownership table. Build once at startup from
57/// `[[cluster.scope]]` config; share across reactor threads via
58/// `Arc<OwnershipTable>`.
59#[derive(Debug, Clone)]
60pub struct OwnershipTable {
61    /// Sorted by prefix length descending so the first
62    /// `iter().find(|s| s.matches(key))` IS the longest-prefix match.
63    /// Stable order on ties (none, after the duplicate-prefix check).
64    scopes: Vec<Scope>,
65}
66
67impl OwnershipTable {
68    /// Validate + sort the scope list. Rejects duplicate prefixes and
69    /// strict overlap (T3.6 linter). `O(n²)` over the scope list,
70    /// which is tiny (~ N scopes per cluster); no need for a trie at
71    /// startup time.
72    pub fn new(mut scopes: Vec<Scope>) -> Result<Self, OwnershipError> {
73        // Duplicate check first (cheapest signal).
74        for i in 0..scopes.len() {
75            for j in (i + 1)..scopes.len() {
76                if scopes[i].prefix == scopes[j].prefix {
77                    return Err(OwnershipError::DuplicatePrefix {
78                        prefix: scopes[i].prefix.clone(),
79                    });
80                }
81            }
82        }
83        // Strict-overlap check: any pair where one is a prefix of the
84        // other (but not equal — covered above) is a configuration
85        // ambiguity.
86        for i in 0..scopes.len() {
87            for j in 0..scopes.len() {
88                if i == j {
89                    continue;
90                }
91                let a = &scopes[i].prefix;
92                let b = &scopes[j].prefix;
93                if a.len() < b.len() && b.starts_with(a) {
94                    return Err(OwnershipError::OverlappingPrefix {
95                        inner: b.clone(),
96                        outer: a.clone(),
97                    });
98                }
99            }
100        }
101        // Sort by prefix length descending: longest-prefix-match
102        // = first-match after sort. `Reverse` flips the natural
103        // length order without an explicit `cmp` closure.
104        scopes.sort_by_key(|s| std::cmp::Reverse(s.prefix.len()));
105        Ok(Self { scopes })
106    }
107
108    /// Iterate over declared scopes (read-only). Mostly useful for
109    /// `INFO`-style telemetry.
110    pub fn scopes(&self) -> &[Scope] {
111        &self.scopes
112    }
113
114    /// T3.13 linter: scopes that declared no fallback. The server
115    /// emits a `WARN` log per entry at boot so operators are aware
116    /// that this scope has zero availability if its writer dies —
117    /// not an error (the operator may have explicitly chosen
118    /// availability < strict ownership), just a heads-up.
119    pub fn scopes_without_fallback(&self) -> Vec<&Scope> {
120        self.scopes.iter().filter(|s| s.fallback().is_none()).collect()
121    }
122
123    /// Number of declared scopes.
124    #[must_use]
125    pub fn len(&self) -> usize {
126        self.scopes.len()
127    }
128
129    /// `true` when no scopes are declared (the keyspace runs in
130    /// pre-Phase-3 mode by default).
131    #[must_use]
132    pub fn is_empty(&self) -> bool {
133        self.scopes.is_empty()
134    }
135
136    /// Longest-prefix match: the most specific scope that owns
137    /// `key`, or `None` when no scope matches.
138    #[must_use]
139    pub fn lookup(&self, key: &[u8]) -> Option<&Scope> {
140        self.scopes.iter().find(|s| s.matches(key))
141    }
142
143    /// Route `key` from the local node's perspective. Treats the
144    /// declared writer as the active owner; F4 fallback handling is
145    /// layered on top in the server's cement (this fn doesn't know
146    /// about live `kevy-elect` state — pure config view).
147    #[must_use]
148    pub fn route<'a>(&'a self, key: &[u8], self_node_id: &str) -> Routing<'a> {
149        let Some(scope) = self.lookup(key) else {
150            return Routing::Unknown;
151        };
152        if scope.writer() == self_node_id {
153            Routing::Owned
154        } else {
155            Routing::Misdirected { target: scope.writer() }
156        }
157    }
158
159    /// Fallback-aware routing: when `writer_down` is true for the
160    /// matched scope's writer, the fallback (if declared) is treated
161    /// as the active owner. Used by the cement after `kevy-elect`
162    /// flags the writer DOWN (F4).
163    ///
164    /// `is_writer_down(node_id) -> bool` is a callback so kevy-scope
165    /// stays elect-agnostic; the cement plugs in its live snapshot.
166    pub fn route_with_fallback_state<'a, F>(
167        &'a self,
168        key: &[u8],
169        self_node_id: &str,
170        mut is_writer_down: F,
171    ) -> Routing<'a>
172    where
173        F: FnMut(&str) -> bool,
174    {
175        let Some(scope) = self.lookup(key) else {
176            return Routing::Unknown;
177        };
178        let active_owner = if is_writer_down(scope.writer()) {
179            scope.fallback().unwrap_or(scope.writer())
180        } else {
181            scope.writer()
182        };
183        if active_owner == self_node_id {
184            Routing::Owned
185        } else {
186            Routing::Misdirected { target: active_owner }
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    fn s(prefix: &[u8], writer: &str) -> Scope {
196        Scope::new(prefix.to_vec(), writer.to_string())
197    }
198
199    #[test]
200    fn empty_table_routes_unknown() {
201        let t = OwnershipTable::new(Vec::new()).unwrap();
202        assert!(t.is_empty());
203        assert_eq!(t.route(b"any-key", "node-1"), Routing::Unknown);
204    }
205
206    #[test]
207    fn longest_prefix_wins() {
208        let t = OwnershipTable::new(vec![
209            // `app:` and `other:` — disjoint, no overlap → fine
210            s(b"app:", "w-app"),
211            s(b"other:", "w-other"),
212        ])
213        .unwrap();
214        assert!(t.route(b"app:billing:invoice", "w-app").is_local_writer());
215        assert!(t.route(b"other:settings", "w-other").is_local_writer());
216        assert_eq!(t.route(b"unrelated", "any"), Routing::Unknown);
217    }
218
219    #[test]
220    fn misdirected_carries_writer_id() {
221        let t = OwnershipTable::new(vec![s(b"app:", "w-app")]).unwrap();
222        let r = t.route(b"app:x", "some-other-node");
223        assert_eq!(r.misdirected_target(), Some("w-app"));
224    }
225
226    #[test]
227    fn overlap_rejected() {
228        let err = OwnershipTable::new(vec![
229            s(b"app:", "w-app"),
230            s(b"app:billing:", "w-billing"),
231        ])
232        .unwrap_err();
233        assert!(matches!(err, OwnershipError::OverlappingPrefix { .. }));
234    }
235
236    #[test]
237    fn duplicate_rejected() {
238        let err = OwnershipTable::new(vec![
239            s(b"app:", "w-app"),
240            s(b"app:", "w-app2"),
241        ])
242        .unwrap_err();
243        assert!(matches!(err, OwnershipError::DuplicatePrefix { .. }));
244    }
245
246    #[test]
247    fn fallback_active_when_writer_down() {
248        let t = OwnershipTable::new(vec![
249            Scope::new(b"app:".to_vec(), "w-app".to_string())
250                .with_fallback("fb-1".to_string()),
251        ])
252        .unwrap();
253        // Writer up: local node (fb-1) sees Misdirected.
254        let r = t.route_with_fallback_state(b"app:x", "fb-1", |_| false);
255        assert_eq!(r.misdirected_target(), Some("w-app"));
256        // Writer down: fallback (fb-1) is now the active owner.
257        let r = t.route_with_fallback_state(b"app:x", "fb-1", |id| id == "w-app");
258        assert!(r.is_local_writer());
259    }
260
261    #[test]
262    fn fallback_absent_falls_through_to_writer() {
263        // No fallback declared → "writer DOWN" still names the writer
264        // as the target (cement decides whether to refuse vs error).
265        let t = OwnershipTable::new(vec![s(b"k:", "w-1")]).unwrap();
266        let r = t.route_with_fallback_state(b"k:abc", "other", |id| id == "w-1");
267        assert_eq!(r.misdirected_target(), Some("w-1"));
268    }
269
270    #[test]
271    fn lookup_matches_longest_prefix_after_sort() {
272        // Build the table with overlap-forbidden distinct prefixes;
273        // longest-match still applies among disjoint declarations.
274        let t = OwnershipTable::new(vec![
275            s(b"a:", "wa"),
276            s(b"b:", "wb"),
277            s(b"abc:", "wabc"),
278        ])
279        .unwrap();
280        assert_eq!(t.lookup(b"a:foo").map(Scope::writer), Some("wa"));
281        assert_eq!(t.lookup(b"abc:foo").map(Scope::writer), Some("wabc"));
282        assert_eq!(t.lookup(b"b:foo").map(Scope::writer), Some("wb"));
283        assert!(t.lookup(b"nope").is_none());
284    }
285}