use std::collections::{HashMap, HashSet};
use std::cell::Cell;
use queen_io::{
epoll::{Epoll, Token, Ready, EpollOpt},
plus::slab::Slab
};
use nson::{
Message, msg,
message_id::MessageId
};
use rand::{SeedableRng, seq::SliceRandom, rngs::SmallRng};
use crate::Wire;
use crate::dict::*;
use crate::error::{Code, Result};
use super::Hook;
use super::{Slot, SlotModify};
pub struct Switch {
pub id: MessageId,
pub chans: HashMap<String, HashSet<usize>>,
pub slot_ids: HashMap<MessageId, usize>,
pub slots: Slab<Slot>,
pub send_num: Cell<usize>,
pub recv_num: Cell<usize>,
rand: SmallRng
}
impl Switch {
pub(crate) fn new(id: MessageId) -> Self {
Self {
id,
chans: HashMap::new(),
slot_ids: HashMap::new(),
slots: Slab::new(),
send_num: Cell::new(0),
recv_num: Cell::new(0),
rand: SmallRng::from_entropy()
}
}
pub(crate) fn add_slot(
&mut self,
epoll: &Epoll,
hook: &impl Hook,
wire: Wire<Message>
) -> Result<()> {
let entry = self.slots.vacant_entry();
let slot = Slot::new(entry.key(), wire);
let success = hook.accept(&slot);
if success && matches!(slot.wire.send(msg!{CODE: 0i32}), Ok(_)) {
epoll.add(&slot.wire, Token(entry.key()), Ready::readable(), EpollOpt::level())?;
entry.insert(slot);
} else {
slot.wire.close();
}
Ok(())
}
pub(crate) fn del_slot(
&mut self,
epoll: &Epoll,
hook: &impl Hook,
token: usize
) -> Result<()> {
if self.slots.contains(token) {
let slot = self.slots.remove(token);
epoll.delete(&slot.wire)?;
for chan in slot.chans.keys() {
if let Some(ids) = self.chans.get_mut(chan) {
ids.remove(&token);
if ids.is_empty() {
self.chans.remove(chan);
}
}
}
self.slot_ids.remove(&slot.id);
hook.remove(&slot);
let event_message = msg!{
CHAN: SLOT_BREAK,
SLOT_ID: slot.id,
LABEL: slot.label.clone(),
ATTR: slot.wire.attr().clone()
};
self.relay_root_message(hook, token, SLOT_BREAK, event_message);
}
Ok(())
}
pub(crate) fn recv_message(
&mut self,
epoll: &Epoll,
hook: &impl Hook,
token: usize,
mut message: Message
) -> Result<()> {
self.recv_num.set(self.recv_num.get() + 1);
let success = hook.recv(&self.slots[token], &mut message);
if !success {
Code::RefuseReceiveMessage.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
let chan = match message.get_str(CHAN) {
Ok(chan) => chan,
Err(_) => {
Code::CannotGetChanField.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
};
if chan.starts_with('_') {
match chan {
AUTH => self.auth(hook, token, message),
ATTACH => self.attach(hook, token, message),
DETACH => self.detach(hook, token, message),
PING => self.ping(hook, token, message),
MINE => self.mine(hook, token, message),
QUERY => self.query(hook, token, message),
CUSTOM => self.custom(hook, token, message),
CTRL => self.ctrl(hook, token, message),
SLOT_KILL => self.kill(epoll, hook, token, message)?,
_ => {
Code::UnsupportedChan.set(&mut message);
self.send_message(hook, token, message);
}
}
} else {
self.relay_message(hook, token, chan.to_string(), message);
}
Ok(())
}
pub(crate) fn send_message(
&self,
hook: &impl Hook,
token: usize,
mut message: Message
) {
if let Some(slot) = self.slots.get(token) {
let success = hook.send(slot, &mut message);
if success {
if slot.wire.send(message).is_ok() {
self.send_num.set(self.send_num.get() + 1);
}
}
}
}
pub(crate) fn relay_root_message(
&self,
hook: &impl Hook,
token: usize,
chan: &str,
message: Message
) {
if let Some(tokens) = self.chans.get(chan) {
for other_token in tokens {
if token == *other_token {
continue;
}
if let Some(slot) = self.slots.get(*other_token) {
let mut message = message.clone();
let success = hook.send(slot, &mut message);
if success {
self.send_message(hook, slot.token, message);
}
}
}
}
}
#[allow(clippy::cognitive_complexity)]
pub(crate) fn relay_message(
&mut self,
hook: &impl Hook,
token: usize,
chan: String,
mut message: Message
) {
if !self.slots[token].auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
let success = hook.emit(&self.slots[token], &mut message);
if !success {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
macro_rules! send {
($self: ident, $hook: ident, $slot: ident, $message: ident) => {
let success = $hook.push($slot, &mut $message);
if success {
$self.send_message($hook, $slot.token, $message.clone());
let event_message = msg!{
CHAN: SLOT_RECV,
VALUE: $message.clone(),
TO: $slot.id.clone()
};
let id = $slot.token;
$self.relay_root_message($hook, id, SLOT_RECV, event_message);
}
};
}
if !message.contains_key(FROM) {
message.insert(FROM, &self.slots[token].id.clone());
}
if let Some(to) = message.get(TO).cloned() {
let mut to_ids = vec![];
if let Some(to_id) = to.as_message_id() {
if self.slot_ids.contains_key(to_id) {
to_ids.push(to_id.clone());
}
} else if let Some(to_array) = to.as_array() {
for to in to_array {
if let Some(to_id) = to.as_message_id() {
if self.slot_ids.contains_key(to_id) {
to_ids.push(to_id.clone());
}
} else {
Code::InvalidToFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
} else {
Code::InvalidToFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
if !to_ids.is_empty() {
if message.get_bool(SHARE).ok().unwrap_or(false) {
if to_ids.len() == 1 {
if let Some(slot_id) = self.slot_ids.get(&to_ids[0]) {
if let Some(slot) = self.slots.get(*slot_id) {
send!(self, hook, slot, message);
}
}
} else if let Some(to) = to_ids.choose(&mut self.rand) {
if let Some(slot_id) = self.slot_ids.get(to) {
if let Some(slot) = self.slots.get(*slot_id) {
send!(self, hook, slot, message);
}
}
}
} else {
for to in &to_ids {
if let Some(slot_id) = self.slot_ids.get(to) {
if let Some(slot) = self.slots.get(*slot_id) {
send!(self, hook, slot, message);
}
}
}
}
}
return
}
let mut labels = HashSet::new();
if let Some(label) = message.get(LABEL) {
if let Some(label) = label.as_str() {
labels.insert(label.to_string());
} else if let Some(label_array) = label.as_array() {
for v in label_array {
if let Some(v) = v.as_str() {
labels.insert(v.to_string());
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
if message.get_bool(SHARE).ok().unwrap_or(false) {
let mut array: Vec<usize> = Vec::new();
if let Some(ids) = self.chans.get(&chan) {
for slot_id in ids.iter() {
if let Some(slot) = self.slots.get(*slot_id) {
if !labels.is_empty() {
let slot_labels = slot.chans.get(&chan).expect("It shouldn't be executed here!");
if (slot_labels & &labels).is_empty() {
continue;
}
}
array.push(*slot_id);
}
}
}
if !array.is_empty() {
if array.len() == 1 {
if let Some(slot) = self.slots.get(array[0]) {
send!(self, hook, slot, message);
}
} else if let Some(id) = array.choose(&mut self.rand) {
if let Some(slot) = self.slots.get(*id) {
send!(self, hook, slot, message);
}
}
}
} else if let Some(ids) = self.chans.get(&chan) {
for slot_id in ids.iter() {
if let Some(slot) = self.slots.get(*slot_id) {
if !labels.is_empty() {
let slot_labels = slot.chans.get(&chan).expect("It shouldn't be executed here!");
if !slot_labels.iter().any(|l| labels.contains(l)) {
continue
}
}
send!(self, hook, slot, message);
}
}
}
let event_message = msg!{
CHAN: SLOT_SEND,
VALUE: message
};
self.relay_root_message(hook, token, SLOT_SEND, event_message);
}
pub(crate) fn auth(
&mut self,
hook: &impl Hook,
token: usize,
mut message: Message
) {
let mut slot = &mut self.slots[token];
let mut modify = SlotModify::default();
if let Some(s) = message.get(ROOT) {
if let Some(s) = s.as_bool() {
modify.root = Some(s);
} else {
Code::InvalidRootFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
if let Some(label) = message.get(LABEL) {
if let Some(label) = label.as_message() {
modify.label = Some(label.clone());
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
if let Some(slot_id) = message.get(SLOT_ID) {
if let Some(slot_id) = slot_id.as_message_id() {
if let Some(other_token) = self.slot_ids.get(slot_id) {
if *other_token != token {
Code::DuplicateSlotId.set(&mut message);
self.send_message(hook, token, message);
return
}
}
modify.id = Some(slot_id.clone());
} else {
Code::InvalidSlotIdFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
let success = hook.auth(slot, &modify, &mut message);
if !success {
Code::AuthenticationFailed.set(&mut message);
self.send_message(hook, token, message);
return
}
if let Some(id) = modify.id {
self.slot_ids.remove(&slot.id);
self.slot_ids.insert(id.clone(), token);
slot.id = id;
} else {
message.insert(SLOT_ID, slot.id.clone());
self.slot_ids.insert(slot.id.clone(), token);
}
if let Some(label) = modify.label {
slot.label = label;
} else {
if !slot.label.is_empty() {
message.insert(LABEL, slot.label.clone());
}
}
if let Some(root) = modify.root {
slot.root = root;
} else {
message.insert(ROOT, slot.root);
}
slot.auth = true;
message.insert(SOCKET_ID, self.id.clone());
Code::Ok.set(&mut message);
let event_message = msg!{
CHAN: SLOT_READY,
ROOT: slot.root,
SLOT_ID: slot.id.clone(),
LABEL: slot.label.clone(),
ATTR: slot.wire.attr().clone()
};
self.send_message(hook, token, message);
self.relay_root_message(hook, token, SLOT_READY, event_message);
}
pub(crate) fn attach(
&mut self,
hook: &impl Hook,
token: usize,
mut message: Message
) {
if !self.slots[token].auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
if let Ok(chan) = message.get_str(VALUE).map(ToOwned::to_owned) {
match chan.as_str() {
SLOT_READY | SLOT_BREAK | SLOT_ATTACH | SLOT_DETACH | SLOT_SEND | SLOT_RECV => {
if !self.slots[token].root {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
}
_ => ()
}
let mut labels = HashSet::new();
if let Some(label) = message.get(LABEL) {
if let Some(label) = label.as_str() {
labels.insert(label.to_string());
} else if let Some(label_array) = label.as_array() {
for v in label_array {
if let Some(v) = v.as_str() {
labels.insert(v.to_string());
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
let success = hook.attach(&self.slots[token], &mut message, &chan, &labels);
if !success {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
let mut event_message = msg!{
CHAN: SLOT_ATTACH,
VALUE: &chan,
SLOT_ID: self.slots[token].id.clone()
};
if let Some(label) = message.get(LABEL) {
event_message.insert(LABEL, label.clone());
}
let ids = self.chans.entry(chan.to_owned()).or_insert_with(HashSet::new);
ids.insert(token);
{
let slot = &mut self.slots[token];
let set = slot.chans.entry(chan).or_insert_with(HashSet::new);
set.extend(labels);
}
self.relay_root_message(hook, token, SLOT_ATTACH, event_message);
Code::Ok.set(&mut message);
} else {
Code::CannotGetValueField.set(&mut message);
}
self.send_message(hook, token, message);
}
pub(crate) fn detach(
&mut self,
hook: &impl Hook,
token: usize,
mut message: Message
) {
if !self.slots[token].auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
if let Ok(chan) = message.get_str(VALUE).map(ToOwned::to_owned) {
let mut labels = HashSet::new();
if let Some(label) = message.get(LABEL) {
if let Some(label) = label.as_str() {
labels.insert(label.to_string());
} else if let Some(label_array) = label.as_array() {
for v in label_array {
if let Some(v) = v.as_str() {
labels.insert(v.to_string());
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
} else {
Code::InvalidLabelFieldType.set(&mut message);
self.send_message(hook, token, message);
return
}
}
let success = hook.detach(&self.slots[token], &mut message, &chan, &labels);
if !success {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
let mut event_message = msg!{
CHAN: SLOT_DETACH,
VALUE: &chan,
SLOT_ID: self.slots[token].id.clone()
};
if let Some(label) = message.get(LABEL) {
event_message.insert(LABEL, label.clone());
}
{
let slot = &mut self.slots[token];
if labels.is_empty() {
slot.chans.remove(&chan);
if let Some(ids) = self.chans.get_mut(&chan) {
ids.remove(&token);
if ids.is_empty() {
self.chans.remove(&chan);
}
}
} else if let Some(set) = slot.chans.get_mut(&chan) {
*set = set.iter().filter(|label| !labels.contains(*label)).map(|s| s.to_string()).collect();
}
}
self.relay_root_message(hook, token, SLOT_DETACH, event_message);
Code::Ok.set(&mut message);
} else {
Code::CannotGetValueField.set(&mut message);
}
self.send_message(hook, token, message);
}
pub(crate) fn ping(&mut self, hook: &impl Hook, token: usize, mut message: Message) {
hook.ping(&self.slots[token], &mut message);
Code::Ok.set(&mut message);
self.send_message(hook, token, message);
}
pub(crate) fn mine(&self, hook: &impl Hook, token: usize, mut message: Message) {
if let Some(slot) = self.slots.get(token) {
let mut chans = Message::new();
for (chan, labels) in &slot.chans {
let labels: Vec<&String> = labels.iter().collect();
chans.insert(chan, labels);
}
let slot = msg!{
AUTH: slot.auth,
ROOT: slot.root,
CHANS: chans,
SLOT_ID: slot.id.clone(),
LABEL: slot.label.clone(),
ATTR: slot.wire.attr().clone(),
SEND_NUM: slot.wire.send_num() as u64,
RECV_NUM: slot.wire.recv_num() as u64
};
message.insert(VALUE, slot);
}
Code::Ok.set(&mut message);
self.send_message(hook, token, message);
}
pub(crate) fn query(&self, hook: &impl Hook, token: usize, mut message: Message) {
{
let slot = &self.slots[token];
if !slot.auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
if !slot.root {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
}
hook.query(self, token, &mut message);
self.send_message(hook, token, message);
}
pub(crate) fn custom(&self, hook: &impl Hook, token: usize, mut message: Message) {
{
let slot = &self.slots[token];
if !slot.auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
}
hook.custom(self, token, &mut message);
self.send_message(hook, token, message);
}
pub(crate) fn ctrl(&mut self, hook: &impl Hook, token: usize, mut message: Message) {
{
let slot = &self.slots[token];
if !slot.auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return
}
if !slot.root {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return
}
}
hook.ctrl(self, token, &mut message);
self.send_message(hook, token, message);
}
pub(crate) fn kill(
&mut self,
epoll: &Epoll,
hook: &impl Hook,
token: usize,
mut message: Message
) -> Result<()> {
{
let slot = &self.slots[token];
if !slot.auth {
Code::Unauthorized.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
if !slot.root {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
}
let success = hook.kill(&self.slots[token], &mut message);
if !success {
Code::PermissionDenied.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
let remove_token;
if let Some(slot_id) = message.get(SLOT_ID) {
if let Some(slot_id) = slot_id.as_message_id() {
if let Some(other_token) = self.slot_ids.get(slot_id).cloned() {
remove_token = Some(other_token);
} else {
Code::TargetSlotIdNotExist.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
} else {
Code::InvalidSlotIdFieldType.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
} else {
Code::CannotGetSlotIdField.set(&mut message);
self.send_message(hook, token, message);
return Ok(())
}
Code::Ok.set(&mut message);
self.send_message(hook, token, message);
if let Some(remove_token) = remove_token {
self.del_slot(epoll, hook, remove_token)?;
}
Ok(())
}
}