1use crate::{Routing, Scope};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum OwnershipError {
11 DuplicatePrefix {
13 prefix: Vec<u8>,
15 },
16 OverlappingPrefix {
25 inner: Vec<u8>,
27 outer: Vec<u8>,
29 },
30}
31
32impl std::fmt::Display for OwnershipError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 Self::DuplicatePrefix { prefix } => {
36 write!(
37 f,
38 "duplicate scope prefix {:?}",
39 String::from_utf8_lossy(prefix)
40 )
41 }
42 Self::OverlappingPrefix { inner, outer } => {
43 write!(
44 f,
45 "overlapping scope prefixes: {:?} is contained by {:?}",
46 String::from_utf8_lossy(inner),
47 String::from_utf8_lossy(outer),
48 )
49 }
50 }
51 }
52}
53
54impl std::error::Error for OwnershipError {}
55
56#[derive(Debug, Clone)]
60pub struct OwnershipTable {
61 scopes: Vec<Scope>,
65}
66
67impl OwnershipTable {
68 pub fn new(mut scopes: Vec<Scope>) -> Result<Self, OwnershipError> {
73 for i in 0..scopes.len() {
75 for j in (i + 1)..scopes.len() {
76 if scopes[i].prefix == scopes[j].prefix {
77 return Err(OwnershipError::DuplicatePrefix {
78 prefix: scopes[i].prefix.clone(),
79 });
80 }
81 }
82 }
83 for i in 0..scopes.len() {
87 for j in 0..scopes.len() {
88 if i == j {
89 continue;
90 }
91 let a = &scopes[i].prefix;
92 let b = &scopes[j].prefix;
93 if a.len() < b.len() && b.starts_with(a) {
94 return Err(OwnershipError::OverlappingPrefix {
95 inner: b.clone(),
96 outer: a.clone(),
97 });
98 }
99 }
100 }
101 scopes.sort_by_key(|s| std::cmp::Reverse(s.prefix.len()));
105 Ok(Self { scopes })
106 }
107
108 pub fn scopes(&self) -> &[Scope] {
111 &self.scopes
112 }
113
114 pub fn scopes_without_fallback(&self) -> Vec<&Scope> {
120 self.scopes.iter().filter(|s| s.fallback().is_none()).collect()
121 }
122
123 #[must_use]
125 pub fn len(&self) -> usize {
126 self.scopes.len()
127 }
128
129 #[must_use]
132 pub fn is_empty(&self) -> bool {
133 self.scopes.is_empty()
134 }
135
136 #[must_use]
139 pub fn lookup(&self, key: &[u8]) -> Option<&Scope> {
140 self.scopes.iter().find(|s| s.matches(key))
141 }
142
143 #[must_use]
148 pub fn route<'a>(&'a self, key: &[u8], self_node_id: &str) -> Routing<'a> {
149 let Some(scope) = self.lookup(key) else {
150 return Routing::Unknown;
151 };
152 if scope.writer() == self_node_id {
153 Routing::Owned
154 } else {
155 Routing::Misdirected { target: scope.writer() }
156 }
157 }
158
159 pub fn route_with_fallback_state<'a, F>(
167 &'a self,
168 key: &[u8],
169 self_node_id: &str,
170 mut is_writer_down: F,
171 ) -> Routing<'a>
172 where
173 F: FnMut(&str) -> bool,
174 {
175 let Some(scope) = self.lookup(key) else {
176 return Routing::Unknown;
177 };
178 let active_owner = if is_writer_down(scope.writer()) {
179 scope.fallback().unwrap_or(scope.writer())
180 } else {
181 scope.writer()
182 };
183 if active_owner == self_node_id {
184 Routing::Owned
185 } else {
186 Routing::Misdirected { target: active_owner }
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 fn s(prefix: &[u8], writer: &str) -> Scope {
196 Scope::new(prefix.to_vec(), writer.to_string())
197 }
198
199 #[test]
200 fn empty_table_routes_unknown() {
201 let t = OwnershipTable::new(Vec::new()).unwrap();
202 assert!(t.is_empty());
203 assert_eq!(t.route(b"any-key", "node-1"), Routing::Unknown);
204 }
205
206 #[test]
207 fn longest_prefix_wins() {
208 let t = OwnershipTable::new(vec![
209 s(b"app:", "w-app"),
211 s(b"other:", "w-other"),
212 ])
213 .unwrap();
214 assert!(t.route(b"app:billing:invoice", "w-app").is_local_writer());
215 assert!(t.route(b"other:settings", "w-other").is_local_writer());
216 assert_eq!(t.route(b"unrelated", "any"), Routing::Unknown);
217 }
218
219 #[test]
220 fn misdirected_carries_writer_id() {
221 let t = OwnershipTable::new(vec![s(b"app:", "w-app")]).unwrap();
222 let r = t.route(b"app:x", "some-other-node");
223 assert_eq!(r.misdirected_target(), Some("w-app"));
224 }
225
226 #[test]
227 fn overlap_rejected() {
228 let err = OwnershipTable::new(vec![
229 s(b"app:", "w-app"),
230 s(b"app:billing:", "w-billing"),
231 ])
232 .unwrap_err();
233 assert!(matches!(err, OwnershipError::OverlappingPrefix { .. }));
234 }
235
236 #[test]
237 fn duplicate_rejected() {
238 let err = OwnershipTable::new(vec![
239 s(b"app:", "w-app"),
240 s(b"app:", "w-app2"),
241 ])
242 .unwrap_err();
243 assert!(matches!(err, OwnershipError::DuplicatePrefix { .. }));
244 }
245
246 #[test]
247 fn fallback_active_when_writer_down() {
248 let t = OwnershipTable::new(vec![
249 Scope::new(b"app:".to_vec(), "w-app".to_string())
250 .with_fallback("fb-1".to_string()),
251 ])
252 .unwrap();
253 let r = t.route_with_fallback_state(b"app:x", "fb-1", |_| false);
255 assert_eq!(r.misdirected_target(), Some("w-app"));
256 let r = t.route_with_fallback_state(b"app:x", "fb-1", |id| id == "w-app");
258 assert!(r.is_local_writer());
259 }
260
261 #[test]
262 fn fallback_absent_falls_through_to_writer() {
263 let t = OwnershipTable::new(vec![s(b"k:", "w-1")]).unwrap();
266 let r = t.route_with_fallback_state(b"k:abc", "other", |id| id == "w-1");
267 assert_eq!(r.misdirected_target(), Some("w-1"));
268 }
269
270 #[test]
271 fn lookup_matches_longest_prefix_after_sort() {
272 let t = OwnershipTable::new(vec![
275 s(b"a:", "wa"),
276 s(b"b:", "wb"),
277 s(b"abc:", "wabc"),
278 ])
279 .unwrap();
280 assert_eq!(t.lookup(b"a:foo").map(Scope::writer), Some("wa"));
281 assert_eq!(t.lookup(b"abc:foo").map(Scope::writer), Some("wabc"));
282 assert_eq!(t.lookup(b"b:foo").map(Scope::writer), Some("wb"));
283 assert!(t.lookup(b"nope").is_none());
284 }
285}