use crate::engine::registry::PeerKey;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::hash::{BuildHasher, Hasher};
#[derive(Default, Clone, Copy)]
struct PeerKeyHasher(u64);
impl Hasher for PeerKeyHasher {
#[inline]
fn write(&mut self, bytes: &[u8]) {
for (i, &b) in bytes.iter().enumerate().take(8) {
self.0 |= (b as u64) << (i * 8);
}
}
#[inline]
fn write_u32(&mut self, n: u32) {
self.0 = n as u64;
}
#[inline]
fn write_u64(&mut self, n: u64) {
self.0 = n;
}
#[inline]
fn finish(&self) -> u64 {
self.0
}
}
#[derive(Default, Clone, Copy)]
struct PeerKeyHasherBuilder;
impl BuildHasher for PeerKeyHasherBuilder {
type Hasher = PeerKeyHasher;
#[inline]
fn build_hasher(&self) -> Self::Hasher {
PeerKeyHasher::default()
}
}
type PeerKeyMap<V> = HashMap<PeerKey, V, PeerKeyHasherBuilder>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubChange {
NewTopic,
DuplicateSub,
Removed,
DuplicateUnsub,
}
#[derive(Default)]
struct PeerSubs {
prefixes: Vec<Vec<u8>>,
accept_all: bool,
}
impl PeerSubs {
fn refresh_accept_all(&mut self) {
self.accept_all = self.prefixes.iter().any(|p| p.is_empty());
}
}
pub(crate) struct TopicRouter {
subscriptions: Mutex<PeerKeyMap<PeerSubs>>,
}
impl TopicRouter {
pub(crate) fn new() -> Self {
Self {
subscriptions: Mutex::new(PeerKeyMap::default()),
}
}
pub(crate) fn register_peer(&self, key: PeerKey) {
self.subscriptions.lock().entry(key).or_default();
}
pub(crate) fn forget_peer(&self, key: PeerKey) {
self.subscriptions.lock().remove(&key);
}
pub(crate) fn clear(&self) {
self.subscriptions.lock().clear();
}
pub(crate) fn apply_sub_message(&self, key: PeerKey, frame: &[u8]) -> Option<SubChange> {
if frame.is_empty() {
return None;
}
let mut subs = self.subscriptions.lock();
let entry = subs.entry(key).or_default();
let tail = &frame[1..];
let outcome = match frame.first() {
Some(1) => {
if entry.prefixes.iter().any(|s| s.as_slice() == tail) {
Some(SubChange::DuplicateSub)
} else {
entry.prefixes.push(Vec::from(tail));
Some(SubChange::NewTopic)
}
}
Some(0) => {
if let Some(idx) = entry.prefixes.iter().position(|s| s.as_slice() == tail) {
entry.prefixes.remove(idx);
Some(SubChange::Removed)
} else {
Some(SubChange::DuplicateUnsub)
}
}
other => {
log::warn!("sub message unexpected first byte: {:?}", other);
return None;
}
};
entry.refresh_accept_all();
outcome
}
pub(crate) fn with_match_guard<R>(&self, f: impl FnOnce(&MatchGuard<'_>) -> R) -> R {
let subs = self.subscriptions.lock();
f(&MatchGuard { subs: &subs })
}
}
pub(crate) struct MatchGuard<'a> {
subs: &'a PeerKeyMap<PeerSubs>,
}
impl<'a> MatchGuard<'a> {
pub(crate) fn matches(&self, key: PeerKey, topic: &[u8], invert: bool) -> bool {
let Some(entry) = self.subs.get(&key) else {
return false;
};
let matched = entry.accept_all
|| entry
.prefixes
.iter()
.any(|f| f.len() <= topic.len() && &topic[..f.len()] == f.as_slice());
matched != invert
}
}
impl Default for TopicRouter {
fn default() -> Self {
Self::new()
}
}