use log::{error, info, trace};
use mio::{Events, Interest, Poll, Token};
use signal_hook::consts::signal::{SIGINT, SIGTERM};
use signal_hook_mio::v1_0::Signals;
use std::collections::HashMap;
use std::io::Result;
use std::time::{Duration, Instant};
use crate::io::TcpServer;
use crate::keybind::Action;
use crate::traits::{IoInstance, IoResult, TOKEN_DEV, TOKEN_DYNAMIC_START, TOKEN_SERVER, TOKEN_SIGNAL};
pub struct IoHub {
poll: Poll,
instances: HashMap<Token, Box<dyn IoInstance>>,
device: Box<dyn IoInstance>,
server: Option<TcpServer>,
signals: Signals,
quit_requested: bool,
}
impl IoHub {
pub fn new(device: Box<dyn IoInstance>, server: Option<TcpServer>) -> Result<Self> {
let mut signals = Signals::new([SIGINT, SIGTERM])?;
let poll = Poll::new()?;
poll.registry()
.register(&mut signals, TOKEN_SIGNAL, Interest::READABLE)?;
let mut io_hub = IoHub {
poll,
instances: HashMap::new(),
device,
server,
signals,
quit_requested: false,
};
if let Some(s) = &mut io_hub.server {
s.register(&mut io_hub.poll, TOKEN_SERVER)?;
}
Ok(io_hub)
}
fn next_free_token(&self) -> Token {
let mut token_id = TOKEN_DYNAMIC_START.0;
loop {
let token = Token(token_id);
if !self.instances.contains_key(&token) {
return token;
}
token_id += 1;
}
}
pub fn add(&mut self, mut instance: Box<dyn IoInstance>) -> Result<()> {
let token = self.next_free_token();
let addr = instance.addr_as_string();
if let Err(e) = instance.connect(&mut self.poll, token) {
error!("Hub({:?}): {} Failed to register {}", token, addr, e);
return Err(e);
}
self.instances.insert(token, instance);
info!("Hub({:?}): {} registered", token, addr);
Ok(())
}
fn all_clients_str(&mut self, msg: String) {
for (_, client) in self.instances.iter_mut() {
client.write_all(msg.as_bytes());
}
}
fn handle_read_result(&mut self, result: IoResult) {
match result {
IoResult::Data(bytes) => {
self.device.write_all(&bytes);
}
IoResult::Action(action) => {
self.handle_action(action);
}
IoResult::None => {}
}
}
fn handle_action(&mut self, action: Action) {
match action {
Action::Quit => {
self.quit_requested = true;
}
Action::Send(bytes) => {
self.device.write_all(&bytes);
}
Action::FilterToggle(_) => {
}
}
}
pub fn handle_event(&mut self, token_event: Token) -> Result<()> {
trace!("handle_event");
if token_event == TOKEN_DEV {
match self.device.read() {
Ok(IoResult::Data(buf)) => {
for (_, client) in self.instances.iter_mut() {
client.write_all(&buf);
}
}
Ok(IoResult::None) => {}
Ok(IoResult::Action(_)) => {
}
Err(e) => {
self.all_clients_str(format!(
"\n\rInfo: {}: {}\n\r",
self.device.addr_as_string(),
e
));
}
}
} else if token_event == TOKEN_SERVER {
if let Some(s) = &mut self.server
&& let Some(c) = s.accept()
{
self.add(c)?;
}
} else if token_event == TOKEN_SIGNAL {
for signal in self.signals.pending() {
info!("Received signal {}, initiating graceful shutdown", signal);
self.quit_requested = true;
}
} else if let Some(client) = self.instances.get_mut(&token_event) {
if let Ok(result) = client.read() {
self.handle_read_result(result);
}
} else {
panic!("Unexpected token became ready: {}", token_event.0);
}
let mut disconnected_tokens = Vec::new();
for (&t, client) in self.instances.iter_mut() {
if !client.connected() {
let addr = client.addr_as_string();
info!("Hub({:?}): {}: disconnect()", t, addr);
client.disconnect(&mut self.poll);
disconnected_tokens.push(t);
}
}
for t in disconnected_tokens {
info!("Hub({:?}): Remove", t);
self.instances.remove(&t);
}
Ok(())
}
pub fn is_quit_requested(&self) -> bool {
self.quit_requested
}
pub fn run(&mut self) -> std::io::Result<()> {
let mut device_connect_warn_first_only = true;
let mut events = Events::with_capacity(128);
let tick = Duration::from_millis(100);
let mut last_tick = Instant::now();
loop {
if self.device.disconnect_needed() {
self.device.disconnect(&mut self.poll);
}
if !self.device.connected() {
match self.device.connect(&mut self.poll, TOKEN_DEV) {
Ok(()) => {
device_connect_warn_first_only = false;
self.all_clients_str(format!(
"Info: {}: Connected\n\r",
self.device.addr_as_string()
));
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
}
Err(e) => {
if device_connect_warn_first_only {
device_connect_warn_first_only = false;
self.all_clients_str(format!(
"Error: {}: {}\n\r",
self.device.addr_as_string(),
e
));
}
}
}
}
match self.poll.poll(&mut events, Some(tick)) {
Ok(()) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
}
Err(e) => return Err(e),
}
for event in events.iter() {
self.handle_event(event.token())?;
}
let results: Vec<_> = self
.instances
.values_mut()
.filter_map(|c| c.tick().ok())
.collect();
for result in results {
self.handle_read_result(result);
}
if self.quit_requested {
return Ok(());
}
let now = Instant::now();
while now.duration_since(last_tick) >= tick {
last_tick = now;
}
}
}
}