use bytes::Bytes;
use flume::{Receiver, Sender};
use hashbrown::HashMap;
#[derive(Debug)]
pub enum RouterCmd {
SendMessage(Vec<Bytes>),
Close,
}
#[derive(Debug)]
pub enum PeerCmd {
SendBody(Vec<Bytes>),
Close,
}
#[derive(Debug)]
pub enum HubEvent {
PeerUp {
routing_id: Bytes, tx: Sender<PeerCmd>,
},
PeerDown {
routing_id: Bytes,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouterBehavior {
Standard,
LoadBalancer,
}
pub struct RouterHub {
peers: HashMap<Bytes, Sender<PeerCmd>>,
lb_list: Vec<Bytes>,
lb_cursor: usize,
behavior: RouterBehavior,
hub_rx: Receiver<HubEvent>,
user_tx_rx: Receiver<RouterCmd>,
}
impl RouterHub {
#[must_use]
pub fn new(
hub_rx: Receiver<HubEvent>,
user_tx_rx: Receiver<RouterCmd>,
behavior: RouterBehavior,
) -> Self {
Self {
peers: HashMap::new(),
lb_list: Vec::new(),
lb_cursor: 0,
behavior,
hub_rx,
user_tx_rx,
}
}
pub async fn run(mut self) {
use futures::select;
use futures::FutureExt;
loop {
select! {
msg = self.hub_rx.recv_async().fuse() => {
match msg {
Ok(ev) => self.handle_peer_event(ev),
Err(_) => break, }
}
msg = self.user_tx_rx.recv_async().fuse() => {
match msg {
Ok(cmd) => self.handle_user_cmd(cmd),
Err(_) => break, }
}
}
}
for tx in self.peers.values() {
let _ = tx.send(PeerCmd::Close);
}
}
fn handle_peer_event(&mut self, event: HubEvent) {
match event {
HubEvent::PeerUp { routing_id, tx } => {
if self.peers.contains_key(&routing_id) {
if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
self.lb_list.remove(pos);
if self.lb_cursor >= self.lb_list.len() {
self.lb_cursor = 0;
}
}
}
self.lb_list.push(routing_id.clone());
self.peers.insert(routing_id, tx);
}
HubEvent::PeerDown { routing_id } => {
self.peers.remove(&routing_id);
if let Some(pos) = self.lb_list.iter().position(|x| x == &routing_id) {
self.lb_list.remove(pos);
if self.lb_cursor >= self.lb_list.len() {
self.lb_cursor = 0;
}
}
}
}
}
fn handle_user_cmd(&mut self, cmd: RouterCmd) {
match cmd {
RouterCmd::SendMessage(parts) => self.route_outbound(parts),
RouterCmd::Close => {
for tx in self.peers.values() {
let _ = tx.send(PeerCmd::Close);
}
}
}
}
fn pick_rr_peer(&mut self) -> Option<Bytes> {
let mut attempts = 0usize;
let max_attempts = self.lb_list.len();
while !self.lb_list.is_empty() && attempts <= max_attempts {
if self.lb_cursor >= self.lb_list.len() {
self.lb_cursor = 0;
}
let id = self.lb_list[self.lb_cursor].clone();
self.lb_cursor = (self.lb_cursor + 1) % self.lb_list.len();
if self.peers.contains_key(&id) {
return Some(id);
}
if let Some(pos) = self.lb_list.iter().position(|x| x == &id) {
self.lb_list.remove(pos);
}
attempts += 1;
}
None
}
fn route_outbound(&mut self, mut parts: Vec<Bytes>) {
if parts.is_empty() {
return;
}
match self.behavior {
RouterBehavior::Standard => {
let target_id = parts.remove(0);
if !parts.is_empty() && parts[0].is_empty() {
parts.remove(0);
}
if let Some(tx) = self.peers.get(&target_id) {
let _ = tx.send(PeerCmd::SendBody(parts));
} else {
}
}
RouterBehavior::LoadBalancer => {
if let Some(id) = self.pick_rr_peer() {
if let Some(tx) = self.peers.get(&id) {
let _ = tx.send(PeerCmd::SendBody(parts));
}
} else {
}
}
}
}
}