use bytes::Bytes;
use smallvec::SmallVec;
pub type PeerKey = u64;
#[derive(Debug, Clone)]
struct Subscription {
prefix: Bytes,
peers: SmallVec<[PeerKey; 4]>,
}
#[derive(Debug, Default)]
pub struct SubscriptionIndex {
subs: Vec<Subscription>,
}
impl SubscriptionIndex {
#[must_use]
pub const fn new() -> Self {
Self { subs: Vec::new() }
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.subs.is_empty()
}
pub fn subscribe(&mut self, peer: PeerKey, prefix: Bytes) {
match self.subs.binary_search_by(|s| s.prefix.cmp(&prefix)) {
Ok(idx) => {
let peers = &mut self.subs[idx].peers;
if !peers.contains(&peer) {
peers.push(peer);
}
}
Err(idx) => {
let mut peers = SmallVec::<[PeerKey; 4]>::new();
peers.push(peer);
self.subs.insert(idx, Subscription { prefix, peers });
}
}
}
pub fn unsubscribe(&mut self, peer: PeerKey, prefix: &Bytes) {
if let Ok(idx) = self.subs.binary_search_by(|s| s.prefix.cmp(prefix)) {
let peers = &mut self.subs[idx].peers;
if let Some(pos) = peers.iter().position(|p| *p == peer) {
peers.swap_remove(pos);
}
if peers.is_empty() {
self.subs.remove(idx);
}
}
}
pub fn remove_peer_everywhere(&mut self, peer: PeerKey) {
let mut i = 0usize;
while i < self.subs.len() {
let peers = &mut self.subs[i].peers;
if let Some(pos) = peers.iter().position(|p| *p == peer) {
peers.swap_remove(pos);
}
if peers.is_empty() {
self.subs.remove(i);
} else {
i += 1;
}
}
}
#[must_use]
pub fn match_topic(&self, topic: &[u8]) -> SmallVec<[PeerKey; 16]> {
let mut out: SmallVec<[PeerKey; 16]> = SmallVec::new();
for sub in &self.subs {
let p = sub.prefix.as_ref();
if p > topic {
break;
}
if topic.starts_with(p) {
out.extend_from_slice(&sub.peers);
}
}
if out.len() > 1 {
out.sort_unstable();
out.dedup();
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn subscribe_and_match() {
let mut idx = SubscriptionIndex::new();
idx.subscribe(1, Bytes::from_static(b"A"));
idx.subscribe(2, Bytes::from_static(b"AB"));
idx.subscribe(3, Bytes::from_static(b"B"));
let m = idx.match_topic(b"ABC");
assert_eq!(m.as_slice(), &[1, 2]);
let m = idx.match_topic(b"BANANA");
assert_eq!(m.as_slice(), &[3]);
}
#[test]
fn dedup_nested_prefixes() {
let mut idx = SubscriptionIndex::new();
idx.subscribe(7, Bytes::from_static(b"A"));
idx.subscribe(7, Bytes::from_static(b"AB"));
let m = idx.match_topic(b"ABCD");
assert_eq!(m.as_slice(), &[7]);
}
#[test]
fn remove_peer_everywhere_cleans_empty_entries() {
let mut idx = SubscriptionIndex::new();
idx.subscribe(1, Bytes::from_static(b"A"));
idx.subscribe(2, Bytes::from_static(b"A"));
idx.subscribe(1, Bytes::from_static(b"AB"));
idx.remove_peer_everywhere(1);
let m = idx.match_topic(b"ABCD");
assert_eq!(m.as_slice(), &[2]);
}
}