extern crate crossbeam_channel;
use crate::dispatcher::{self, ISettings, Response};
use crate::{die, ChServer, IType, Params};
use byteorder::ReadBytesExt;
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io, thread};
pub trait Server {
fn run(self, params: Arc<Params>);
}
impl Server for ChServer {
fn run(self, params: Arc<Params>) {
info!("Trying to bind TCP port {}", params.port);
let listener = match TcpListener::bind(format!("0.0.0.0:{}", params.port)) {
Ok(listener) => listener,
Err(err) => die!(
"Failed to start TCP/IP listener at port {}: {}",
params.port,
err
),
};
info!("Awaiting requests from client at port {}", params.port);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let (ch_resp_send, ch_resp_recv) = unbounded();
let p = params.clone();
match stream.try_clone() {
Ok(stream) => {
thread::spawn(move || stream.writer(ch_resp_recv, p));
}
Err(e) => {
error!("Could not clone stream: {}", e);
continue;
}
}
let ch = self.clone();
thread::spawn(move || stream.reader(ch, ch_resp_send));
}
Err(err) => warn!("Client connection error: {}", err),
}
}
}
}
trait StreamWriter: Write {
fn writer(self, ch_resp_recv: Receiver<Response>, p: Arc<Params>);
fn send_response(&mut self, r: Response, p: &Params) -> io::Result<()> {
let mut response = {
let state = r.state.integration.read();
let (pattern, frame) = if let Some(results) = r.results.as_ref() {
results.pack(p.resize, state.speed())
} else {
(String::new(), String::new())
};
let mut response = Some(ClientResponse {
running: state.is_running(),
path: state.path(),
last_frame: None,
last_dat: None,
pattern: &pattern,
pattern_size: pattern.len(),
timestamp: 0.,
frame: &frame,
frame_size: frame.len(),
total: r.total,
done: r.done,
transmission: 0.,
});
if let Some(results) = r.results.as_ref() {
let mut response = response.as_mut().unwrap();
response.last_frame = Some(&results.path);
response.last_dat = Some(&results.name);
response.timestamp = results.timestamp;
response.transmission = results.transmission;
}
let mut ts = TotalState {
server: ServerState::current(p.started),
waxs: None,
saxs: None,
errors: r.state.errors,
warnings: r.state.warnings,
};
match state.itype() {
IType::SAXS => ts.saxs = response,
IType::WAXS => ts.waxs = response,
}
serde_json::to_string(&ts).unwrap()
};
response.push_str("\r\n");
let response = response.as_bytes();
let mut written = 0;
while written < response.len() {
written += self.write(&response[written..])?;
}
Ok(())
}
}
impl StreamWriter for TcpStream {
fn writer(mut self, ch_resp_recv: Receiver<Response>, p: Arc<Params>) {
for response in ch_resp_recv {
if let Err(e) = self.send_response(response, p.as_ref()) {
warn!("Failed to write response: {}", e);
return;
}
}
}
}
trait StreamReader: Read {
fn reader(self, ch: ChServer, ch_resp: Sender<Response>);
}
impl StreamReader for TcpStream {
fn reader(mut self, ch: ChServer, ch_resp: Sender<Response>) {
match self.peer_addr() {
Ok(v) => info!("A client connected from {}", v),
Err(e) => {
warn!("Error during listening a client: {}", e);
return;
}
}
let mut line = String::new();
loop {
loop {
match self.read_u8() {
Ok(v) => {
let chr = v as char;
if chr == '\n' {
break;
} else if chr != '\r' {
line.push(chr);
}
}
Err(e) => {
warn!("Failed to read stream: {}", e);
return;
}
}
}
line.parse_request(&ch, &ch_resp);
line.clear();
}
}
}
trait ClientRequestParser {
fn parse_request(&self, ch: &ChServer, ch_resp: &Sender<Response>);
}
impl ClientRequestParser for str {
fn parse_request(&self, ch: &ChServer, ch_resp: &Sender<Response>) {
if self.is_empty() {
return;
}
let result: serde_json::Result<ClientRequest> = serde_json::from_str(self);
let request = match result {
Ok(request) => request,
Err(e) => {
warn!(
"Failed to parse JSON request from the client: {}: {}",
e, self
);
return;
}
};
if let Some(saxs) = request.saxs {
let saxs = ISettings {
itype: IType::SAXS,
chan: ch_resp.clone(),
settings: saxs,
};
let _ = ch.ch_send_saxs_settings.send(saxs);
}
if let Some(waxs) = request.waxs {
let waxs = ISettings {
itype: IType::WAXS,
chan: ch_resp.clone(),
settings: waxs,
};
let _ = ch.ch_send_waxs_settings.send(waxs);
}
if request.state != 0 {
let _ = ch.ch_req_saxs_state.send(ch_resp.clone());
let _ = ch.ch_req_waxs_state.send(ch_resp.clone());
}
if request.kill != 0 {
let _ = ch.ch_send_exit.send(());
}
}
}
#[derive(serde::Serialize)]
pub struct ServerState {
#[serde(rename = "startedTimestamp")]
pub started: f64,
pub killed: bool,
}
impl ServerState {
fn current(started: f64) -> ServerState {
ServerState {
started,
killed: false,
}
}
}
#[derive(serde::Serialize)]
struct TotalState<'a> {
server: ServerState,
waxs: Option<ClientResponse<'a>>,
saxs: Option<ClientResponse<'a>>,
errors: Vec<String>,
warnings: Vec<String>,
}
#[derive(serde::Deserialize)]
struct ClientRequest {
#[serde(default)]
state: i32,
#[serde(default)]
kill: i32,
#[serde(default)]
waxs: Option<dispatcher::Settings>,
#[serde(default)]
saxs: Option<dispatcher::Settings>,
}
#[derive(serde::Serialize)]
struct ClientResponse<'a> {
running: bool,
path: &'a str,
#[serde(rename = "imageFile")]
last_frame: Option<&'a PathBuf>,
#[serde(rename = "chiFile")]
last_dat: Option<&'a PathBuf>,
pattern: &'a str,
#[serde(rename = "patternSize")]
pattern_size: usize,
timestamp: f64,
frame: &'a str,
#[serde(rename = "frameSize")]
frame_size: usize,
#[serde(rename = "all")]
total: isize,
#[serde(rename = "total")]
done: isize,
transmission: f64,
}