use std::error::Error as StdError;
use std::io::{Read as IoRead, Write as IoWrite};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::num::NonZeroU64;
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use log::*;
use memchr::memchr;
use derive_new::new;
use hashbrown::{HashMap, HashSet};
use crossbeam_channel::{unbounded, Sender, Receiver, select, tick};
use serde_json::{Value, json};
use mlzutil::time::localtime;
use parking_lot::{const_mutex, Mutex};
use crate::config::ServerConfig;
use crate::errors::Error;
use crate::module::ModInternals;
use crate::proto::{IncomingMsg, Msg, Msg::*, IDENT_REPLY};
pub const RECVBUF_LEN: usize = 4096;
pub const MAX_MSG_LEN: usize = 1024*1024;
pub type HandlerId = NonZeroU64;
#[derive(new)]
pub struct Server {
config: ServerConfig,
}
pub type ConSender = Sender<(HandlerId, RepSender)>;
pub type ConReceiver = Receiver<(HandlerId, RepSender)>;
pub type ReqSender = Sender<(HandlerId, IncomingMsg)>;
pub type ReqReceiver = Receiver<(HandlerId, IncomingMsg)>;
pub type RepSender = Sender<Msg>;
pub type RepReceiver = Receiver<Msg>;
pub type ModRepSender = Sender<(Option<HandlerId>, Msg)>;
pub type ModRepReceiver = Receiver<(Option<HandlerId>, Msg)>;
pub static CON_SENDER: Mutex<Option<ConSender>> = const_mutex(None);
pub static REQ_SENDER: Mutex<Option<ReqSender>> = const_mutex(None);
static NEXT_HID: AtomicUsize = AtomicUsize::new(1);
pub fn next_handler_id() -> HandlerId {
NonZeroU64::new(NEXT_HID.fetch_add(1, Ordering::SeqCst) as u64).expect("is nonzero")
}
impl Server {
fn tcp_listener(tcp_sock: TcpListener) {
mlzlog::set_thread_prefix("TCP: ");
info!("listener started");
let con_sender = CON_SENDER.lock().clone().expect("no server running?");
while let Ok((stream, addr)) = tcp_sock.accept() {
info!("[{}] new client connected", addr);
let new_req_sender = REQ_SENDER.lock().clone().expect("no server running?");
let (rep_sender, rep_receiver) = unbounded();
let disp_rep_sender = rep_sender.clone();
let hid = next_handler_id();
con_sender.send((hid, disp_rep_sender)).unwrap();
thread::spawn(move || Handler::new(hid, stream, addr,
new_req_sender, rep_sender, rep_receiver).handle());
}
}
pub fn start<F>(mut self, addr: &str, mod_runner: F) -> Result<(), Box<dyn StdError>>
where F: Fn(ModInternals) -> Result<(), Box<dyn StdError>>
{
let (con_sender, con_receiver) = unbounded();
*CON_SENDER.lock() = Some(con_sender);
let (req_sender, req_receiver) = unbounded();
*REQ_SENDER.lock() = Some(req_sender);
let (rep_sender, rep_receiver) = unbounded();
let mut active_sets = HashMap::new();
let mut mod_senders = HashMap::new();
for (name, modcfg) in self.config.modules.drain() {
let (mod_sender, mod_receiver) = unbounded();
let mod_rep_sender = rep_sender.clone();
let tickers = (tick(Duration::from_secs(1)), tick(Duration::from_secs(1)));
let int = ModInternals::new(name.clone(), modcfg, mod_receiver, mod_rep_sender, tickers);
active_sets.insert(name.clone(), HashSet::new());
mod_senders.insert(name, mod_sender);
mod_runner(int)?;
}
let descriptive = json!({
"description": self.config.description,
"equipment_id": self.config.equipment_id,
"firmware": concat!("secop-rs ", env!("CARGO_PKG_VERSION")),
"modules": {}
});
let dispatcher = Dispatcher {
descriptive: descriptive,
active: active_sets,
handlers: HashMap::new(),
modules: mod_senders,
connections: con_receiver,
requests: req_receiver,
replies: rep_receiver,
};
thread::spawn(move || dispatcher.run());
let tcp_sock = TcpListener::bind(addr)?;
thread::spawn(move || Server::tcp_listener(tcp_sock));
Ok(())
}
}
struct Dispatcher {
descriptive: Value,
handlers: HashMap<HandlerId, RepSender>,
active: HashMap<String, HashSet<HandlerId>>,
modules: HashMap<String, ReqSender>,
connections: ConReceiver,
requests: ReqReceiver,
replies: ModRepReceiver,
}
impl Dispatcher {
fn send_back(&self, hid: HandlerId, msg: Msg) {
if let Some(chan) = self.handlers.get(&hid) {
let _ = chan.send(msg);
}
}
fn run(mut self) {
mlzlog::set_thread_prefix("Dispatcher: ");
let mut global_activate_remaining = 0;
loop {
select! {
recv(self.connections) -> res => if let Ok((hid, conn)) = res {
debug!("got handler {}", hid);
self.handlers.insert(hid, conn);
},
recv(self.requests) -> res => if let Ok((hid, req)) = res {
debug!("got request {} -> {}", hid, req);
match req.1 {
Do { ref module, .. } |
Change { ref module, .. } |
Read { ref module, .. } => {
if let Some(chan) = self.modules.get(module) {
chan.send((hid, req)).unwrap();
} else {
self.send_back(hid, Error::no_module().into_msg(req.0));
}
}
Activate { ref module } => {
if !module.is_empty() {
if let Some(chan) = self.modules.get(module) {
chan.send((hid, req)).unwrap();
} else {
self.send_back(hid, Error::no_module().into_msg(req.0));
continue;
}
} else {
if global_activate_remaining > 0 {
self.send_back(hid, Error::protocol(
"already activating").into_msg(req.0));
continue;
}
for chan in self.modules.values() {
chan.send((hid, req.clone())).unwrap();
}
global_activate_remaining = self.modules.len();
}
}
Deactivate { module } => {
if !module.is_empty() {
if !self.modules.contains_key(&module) {
self.send_back(hid, Error::no_module().into_msg(req.0));
continue;
}
self.active.get_mut(&module).expect("always there").remove(&hid);
} else {
for module in self.modules.keys() {
self.active.get_mut(module).expect("always there").remove(&hid);
}
}
self.send_back(hid, Inactive { module });
}
Describe => {
self.send_back(hid, Describing {
id: ".".into(),
structure: self.descriptive.clone()
});
}
Quit => {
self.handlers.remove(&hid);
for set in self.active.values_mut() {
set.remove(&hid);
}
}
_ => warn!("message should not arrive here: {}", req.1),
}
},
recv(self.replies) -> res => if let Ok((hid, rep)) = res {
match hid {
None => match rep {
Describing { id, structure } => {
let obj = self.descriptive["modules"].as_object_mut().expect("object");
obj.insert(id, structure);
}
Update { ref module, .. } => {
debug!("got {}", rep);
for &hid in &self.active[module] {
self.send_back(hid, rep.clone());
}
}
_ => ()
},
Some(hid) => match rep {
InitUpdates { module, updates } => {
for msg in updates {
self.send_back(hid, msg);
}
if !module.is_empty() {
self.send_back(hid, Active { module: module.clone() });
self.active.get_mut(&module).expect("always there").insert(hid);
} else {
global_activate_remaining -= 1;
if global_activate_remaining == 0 {
self.send_back(hid, Active { module: "".into() });
for set in self.active.values_mut() {
set.insert(hid);
}
}
}
}
_ => {
debug!("got reply {} for {}", rep, hid);
self.send_back(hid, rep)
}
}
}
}
}
}
}
}
pub struct Handler {
client: TcpStream,
hid: HandlerId,
req_sender: ReqSender,
rep_sender: RepSender,
}
impl Handler {
pub fn new(hid: HandlerId, client: TcpStream, addr: SocketAddr, req_sender: ReqSender,
rep_sender: RepSender, rep_receiver: RepReceiver) -> Handler {
let send_client = client.try_clone().expect("could not clone socket");
let thread_name = addr.to_string();
thread::spawn(move || Handler::sender(&thread_name, send_client, rep_receiver));
mlzlog::set_thread_prefix(format!("[{}] ", addr));
Handler { hid, client, req_sender, rep_sender }
}
fn sender(name: &str, client: TcpStream, rep_receiver: RepReceiver) {
mlzlog::set_thread_prefix(format!("[{}] ", name));
let mut client = std::io::BufWriter::new(client);
for to_send in rep_receiver {
if let Err(err) = write!(client, "{}\n", to_send) {
warn!("write error in sender: {}", err);
break;
}
let _ = client.flush();
}
info!("sender quit");
}
fn send_back(&self, msg: Msg) {
self.rep_sender.send(msg).expect("sending to client failed");
}
fn handle_msg(&self, msg: IncomingMsg) {
match msg.1 {
Change { .. } | Do { .. } | Read { .. } | Describe |
Activate { .. } | Deactivate { .. } => {
self.req_sender.send((self.hid, msg)).unwrap();
}
Ping { token } => {
let data = json!([null, {"t": localtime()}]);
self.send_back(Pong { token, data });
}
Idn => {
self.send_back(IdnReply { encoded: IDENT_REPLY.into() });
}
_ => {
warn!("message {:?} not handled yet", msg.1);
}
}
}
fn process(&self, line: String) {
match Msg::parse(line) {
Ok(msg) => {
debug!("processing {}", msg);
self.handle_msg(msg);
}
Err(msg) => {
warn!("failed to parse line: {}", msg);
self.send_back(msg);
}
}
}
pub fn handle(mut self) {
let mut buf = Vec::with_capacity(RECVBUF_LEN);
let mut recvbuf = [0u8; RECVBUF_LEN];
loop {
let got = match self.client.read(&mut recvbuf) {
Err(err) => {
warn!("error in recv, closing connection: {}", err);
break;
},
Ok(0) => break, Ok(got) => got,
};
buf.extend_from_slice(&recvbuf[..got]);
let mut from = 0;
while let Some(to) = memchr(b'\n', &buf[from..]) {
let line_str = String::from_utf8_lossy(&buf[from..from+to]);
let line_str = line_str.trim_end_matches('\r');
self.process(line_str.to_owned());
from += to + 1;
}
buf.drain(..from);
if buf.len() > MAX_MSG_LEN {
warn!("hit request length limit, closing connection");
break;
}
}
self.req_sender.send((self.hid, IncomingMsg::bare(Quit))).unwrap();
info!("handler is finished");
}
}