use crate::pubsub::index::{PeerKey, SubscriptionIndex};
use crate::router::PeerCmd;
use bytes::Bytes;
use flume::{Receiver, Sender};
use hashbrown::HashMap;
#[derive(Debug)]
pub enum PubSubCmd {
Publish(Vec<Bytes>),
Close,
}
#[derive(Debug)]
pub enum PubSubEvent {
PeerUp {
routing_id: Bytes,
epoch: u64,
tx: Sender<PeerCmd>,
},
PeerDown {
routing_id: Bytes,
epoch: u64,
},
Subscribe {
routing_id: Bytes,
prefix: Bytes,
},
Unsubscribe {
routing_id: Bytes,
prefix: Bytes,
},
}
pub struct PubSubHub {
index: SubscriptionIndex,
rid_to_key: HashMap<Bytes, PeerKey>,
key_to_rid: HashMap<PeerKey, Bytes>,
peers: HashMap<PeerKey, (u64, Sender<PeerCmd>)>,
next_key: PeerKey,
hub_rx: Receiver<PubSubEvent>,
user_tx_rx: Receiver<PubSubCmd>,
}
impl PubSubHub {
#[must_use]
pub fn new(hub_rx: Receiver<PubSubEvent>, user_tx_rx: Receiver<PubSubCmd>) -> Self {
Self {
index: SubscriptionIndex::new(),
rid_to_key: HashMap::new(),
key_to_rid: HashMap::new(),
peers: HashMap::new(),
next_key: 1, hub_rx,
user_tx_rx,
}
}
pub async fn run(mut self) {
use futures::select;
use futures::FutureExt;
loop {
select! {
msg = self.hub_rx.recv_async().fuse() => {
match msg {
Ok(ev) => self.on_hub_event(ev),
Err(_) => break, }
}
msg = self.user_tx_rx.recv_async().fuse() => {
match msg {
Ok(cmd) => self.on_user_cmd(cmd),
Err(_) => break, }
}
}
}
}
fn on_hub_event(&mut self, ev: PubSubEvent) {
match ev {
PubSubEvent::PeerUp {
routing_id,
epoch,
tx,
} => {
let key = if let Some(&k) = self.rid_to_key.get(&routing_id) {
k
} else {
let k = self.next_key;
self.next_key += 1;
self.key_to_rid.insert(k, routing_id.clone());
self.rid_to_key.insert(routing_id, k);
k
};
self.peers.insert(key, (epoch, tx));
}
PubSubEvent::PeerDown { routing_id, epoch } => {
if let Some(&key) = self.rid_to_key.get(&routing_id) {
if let Some((current_epoch, _)) = self.peers.get(&key) {
if *current_epoch == epoch {
self.peers.remove(&key);
self.index.remove_peer_everywhere(key);
}
}
}
}
PubSubEvent::Subscribe { routing_id, prefix } => {
if let Some(&key) = self.rid_to_key.get(&routing_id) {
if self.peers.contains_key(&key) {
self.index.subscribe(key, prefix);
}
}
}
PubSubEvent::Unsubscribe { routing_id, prefix } => {
if let Some(&key) = self.rid_to_key.get(&routing_id) {
self.index.unsubscribe(key, &prefix);
}
}
}
}
fn on_user_cmd(&mut self, cmd: PubSubCmd) {
match cmd {
PubSubCmd::Publish(parts) => self.publish(parts),
PubSubCmd::Close => {
for (_, (_, tx)) in &self.peers {
let _ = tx.send(PeerCmd::Close);
}
}
}
}
fn publish(&mut self, parts: Vec<Bytes>) {
if parts.is_empty() || self.index.is_empty() {
return;
}
let topic = &parts[0];
let keys = self.index.match_topic(topic);
if keys.is_empty() {
return;
}
for key in keys {
if let Some((_, tx)) = self.peers.get(&key) {
let _ = tx.send(PeerCmd::SendBody(parts.clone()));
}
}
}
}