use crate::{Routing, Scope};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OwnershipError {
DuplicatePrefix {
prefix: Vec<u8>,
},
OverlappingPrefix {
inner: Vec<u8>,
outer: Vec<u8>,
},
}
impl std::fmt::Display for OwnershipError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicatePrefix { prefix } => {
write!(
f,
"duplicate scope prefix {:?}",
String::from_utf8_lossy(prefix)
)
}
Self::OverlappingPrefix { inner, outer } => {
write!(
f,
"overlapping scope prefixes: {:?} is contained by {:?}",
String::from_utf8_lossy(inner),
String::from_utf8_lossy(outer),
)
}
}
}
}
impl std::error::Error for OwnershipError {}
#[derive(Debug, Clone)]
pub struct OwnershipTable {
scopes: Vec<Scope>,
}
impl OwnershipTable {
pub fn new(mut scopes: Vec<Scope>) -> Result<Self, OwnershipError> {
for i in 0..scopes.len() {
for j in (i + 1)..scopes.len() {
if scopes[i].prefix == scopes[j].prefix {
return Err(OwnershipError::DuplicatePrefix {
prefix: scopes[i].prefix.clone(),
});
}
}
}
for i in 0..scopes.len() {
for j in 0..scopes.len() {
if i == j {
continue;
}
let a = &scopes[i].prefix;
let b = &scopes[j].prefix;
if a.len() < b.len() && b.starts_with(a) {
return Err(OwnershipError::OverlappingPrefix {
inner: b.clone(),
outer: a.clone(),
});
}
}
}
scopes.sort_by_key(|s| std::cmp::Reverse(s.prefix.len()));
Ok(Self { scopes })
}
pub fn scopes(&self) -> &[Scope] {
&self.scopes
}
pub fn scopes_without_fallback(&self) -> Vec<&Scope> {
self.scopes.iter().filter(|s| s.fallback().is_none()).collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.scopes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.scopes.is_empty()
}
#[must_use]
pub fn lookup(&self, key: &[u8]) -> Option<&Scope> {
self.scopes.iter().find(|s| s.matches(key))
}
#[must_use]
pub fn route<'a>(&'a self, key: &[u8], self_node_id: &str) -> Routing<'a> {
let Some(scope) = self.lookup(key) else {
return Routing::Unknown;
};
if scope.writer() == self_node_id {
Routing::Owned
} else {
Routing::Misdirected { target: scope.writer() }
}
}
pub fn route_with_fallback_state<'a, F>(
&'a self,
key: &[u8],
self_node_id: &str,
mut is_writer_down: F,
) -> Routing<'a>
where
F: FnMut(&str) -> bool,
{
let Some(scope) = self.lookup(key) else {
return Routing::Unknown;
};
let active_owner = if is_writer_down(scope.writer()) {
scope.fallback().unwrap_or(scope.writer())
} else {
scope.writer()
};
if active_owner == self_node_id {
Routing::Owned
} else {
Routing::Misdirected { target: active_owner }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn s(prefix: &[u8], writer: &str) -> Scope {
Scope::new(prefix.to_vec(), writer.to_string())
}
#[test]
fn empty_table_routes_unknown() {
let t = OwnershipTable::new(Vec::new()).unwrap();
assert!(t.is_empty());
assert_eq!(t.route(b"any-key", "node-1"), Routing::Unknown);
}
#[test]
fn longest_prefix_wins() {
let t = OwnershipTable::new(vec![
s(b"app:", "w-app"),
s(b"other:", "w-other"),
])
.unwrap();
assert!(t.route(b"app:billing:invoice", "w-app").is_local_writer());
assert!(t.route(b"other:settings", "w-other").is_local_writer());
assert_eq!(t.route(b"unrelated", "any"), Routing::Unknown);
}
#[test]
fn misdirected_carries_writer_id() {
let t = OwnershipTable::new(vec![s(b"app:", "w-app")]).unwrap();
let r = t.route(b"app:x", "some-other-node");
assert_eq!(r.misdirected_target(), Some("w-app"));
}
#[test]
fn overlap_rejected() {
let err = OwnershipTable::new(vec![
s(b"app:", "w-app"),
s(b"app:billing:", "w-billing"),
])
.unwrap_err();
assert!(matches!(err, OwnershipError::OverlappingPrefix { .. }));
}
#[test]
fn duplicate_rejected() {
let err = OwnershipTable::new(vec![
s(b"app:", "w-app"),
s(b"app:", "w-app2"),
])
.unwrap_err();
assert!(matches!(err, OwnershipError::DuplicatePrefix { .. }));
}
#[test]
fn fallback_active_when_writer_down() {
let t = OwnershipTable::new(vec![
Scope::new(b"app:".to_vec(), "w-app".to_string())
.with_fallback("fb-1".to_string()),
])
.unwrap();
let r = t.route_with_fallback_state(b"app:x", "fb-1", |_| false);
assert_eq!(r.misdirected_target(), Some("w-app"));
let r = t.route_with_fallback_state(b"app:x", "fb-1", |id| id == "w-app");
assert!(r.is_local_writer());
}
#[test]
fn fallback_absent_falls_through_to_writer() {
let t = OwnershipTable::new(vec![s(b"k:", "w-1")]).unwrap();
let r = t.route_with_fallback_state(b"k:abc", "other", |id| id == "w-1");
assert_eq!(r.misdirected_target(), Some("w-1"));
}
#[test]
fn lookup_matches_longest_prefix_after_sort() {
let t = OwnershipTable::new(vec![
s(b"a:", "wa"),
s(b"b:", "wb"),
s(b"abc:", "wabc"),
])
.unwrap();
assert_eq!(t.lookup(b"a:foo").map(Scope::writer), Some("wa"));
assert_eq!(t.lookup(b"abc:foo").map(Scope::writer), Some("wabc"));
assert_eq!(t.lookup(b"b:foo").map(Scope::writer), Some("wb"));
assert!(t.lookup(b"nope").is_none());
}
}