bubbles 3.1.1

Bubble integration server for powder diffraction
extern crate crossbeam_channel;
use crate::dispatcher::{self, ISettings, Response};
use crate::{die, ChServer, IType, Params};
use byteorder::ReadBytesExt;
use crossbeam_channel::{unbounded, Receiver, Sender};
use serde;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io, thread};

pub trait Server {
    fn run(self, params: Arc<Params>);
}

impl Server for ChServer {
    fn run(self, params: Arc<Params>) {
        info!("Trying to bind TCP port {}", params.port);
        let listener = match TcpListener::bind(format!("0.0.0.0:{}", params.port)) {
            Ok(listener) => listener,
            Err(err) => die!(
                "Failed to start TCP/IP listener at port {}: {}",
                params.port,
                err
            ),
        };
        info!("Awaiting requests from client at port {}", params.port);
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let (ch_resp_send, ch_resp_recv) = unbounded();
                    let p = params.clone();
                    match stream.try_clone() {
                        Ok(stream) => {
                            thread::spawn(move || stream.writer(ch_resp_recv, p));
                        }
                        Err(e) => {
                            error!("Could not clone stream: {}", e);
                            continue;
                        }
                    }
                    let ch = self.clone();
                    thread::spawn(move || stream.reader(ch, ch_resp_send));
                }
                Err(err) => warn!("Client connection error: {}", err),
            }
        }
    }
}

trait StreamWriter: Write {
    fn writer(self, ch_resp_recv: Receiver<Response>, p: Arc<Params>);

    fn send_response(&mut self, r: Response, p: &Params) -> io::Result<()> {
        let mut response = {
            let state = r.state.integration.read();
            let (pattern, frame) = if let Some(results) = r.results.as_ref() {
                results.pack(p.resize, state.speed())
            } else {
                (String::new(), String::new())
            };
            let mut response = Some(ClientResponse {
                running: state.is_running(),
                path: state.path(),
                last_frame: None,
                last_dat: None,
                pattern: &pattern,
                pattern_size: pattern.len(),
                timestamp: 0.,
                frame: &frame,
                frame_size: frame.len(),
                total: r.total,
                done: r.done,
                transmission: 0.,
            });
            if let Some(results) = r.results.as_ref() {
                let mut response = response.as_mut().unwrap();
                response.last_frame = Some(&results.path);
                response.last_dat = Some(&results.name);
                response.timestamp = results.timestamp;
                response.transmission = results.transmission;
            }
            let mut ts = TotalState {
                server: ServerState::current(p.started),
                waxs: None,
                saxs: None,
                errors: r.state.errors,
                warnings: r.state.warnings,
            };
            match state.itype() {
                IType::SAXS => ts.saxs = response,
                IType::WAXS => ts.waxs = response,
            }
            serde_json::to_string(&ts).unwrap()
        };
        response.push_str("\r\n");
        let response = response.as_bytes();
        let mut written = 0;
        while written < response.len() {
            written += self.write(&response[written..])?;
        }
        Ok(())
    }
}

impl StreamWriter for TcpStream {
    fn writer(mut self, ch_resp_recv: Receiver<Response>, p: Arc<Params>) {
        for response in ch_resp_recv {
            if let Err(e) = self.send_response(response, p.as_ref()) {
                warn!("Failed to write response: {}", e);
                return;
            }
        }
    }
}

trait StreamReader: Read {
    fn reader(self, ch: ChServer, ch_resp: Sender<Response>);
}

impl StreamReader for TcpStream {
    //noinspection RsLiveness
    fn reader(mut self, ch: ChServer, ch_resp: Sender<Response>) {
        match self.peer_addr() {
            Ok(v) => info!("A client connected from {}", v),
            Err(e) => {
                warn!("Error during listening a client: {}", e);
                return;
            }
        }
        let mut line = String::new();
        loop {
            loop {
                match self.read_u8() {
                    Ok(v) => {
                        let chr = v as char;
                        if chr == '\n' {
                            break;
                        } else if chr != '\r' {
                            line.push(chr);
                        }
                    }
                    Err(e) => {
                        warn!("Failed to read stream: {}", e);
                        return;
                    }
                }
            }
            line.parse_request(&ch, &ch_resp);
            line.clear();
        }
    }
}

trait ClientRequestParser {
    fn parse_request(&self, ch: &ChServer, ch_resp: &Sender<Response>);
}

impl ClientRequestParser for str {
    fn parse_request(&self, ch: &ChServer, ch_resp: &Sender<Response>) {
        if self.is_empty() {
            return;
        }
        let result: serde_json::Result<ClientRequest> = serde_json::from_str(self);
        let request = match result {
            Ok(request) => request,
            Err(e) => {
                warn!(
                    "Failed to parse JSON request from the client: {}: {}",
                    e, self
                );
                return;
            }
        };
        if let Some(saxs) = request.saxs {
            let saxs = ISettings {
                itype: IType::SAXS,
                chan: ch_resp.clone(),
                settings: saxs,
            };
            let _ = ch.ch_send_saxs_settings.send(saxs);
        }
        if let Some(waxs) = request.waxs {
            let waxs = ISettings {
                itype: IType::WAXS,
                chan: ch_resp.clone(),
                settings: waxs,
            };
            let _ = ch.ch_send_waxs_settings.send(waxs);
        }
        if request.state != 0 {
            let _ = ch.ch_req_saxs_state.send(ch_resp.clone());
            let _ = ch.ch_req_waxs_state.send(ch_resp.clone());
        }
        if request.kill != 0 {
            let _ = ch.ch_send_exit.send(());
        }
    }
}

#[derive(serde::Serialize)]
pub struct ServerState {
    #[serde(rename = "startedTimestamp")]
    pub started: f64,
    pub killed: bool,
}

impl ServerState {
    fn current(started: f64) -> ServerState {
        ServerState {
            started,
            killed: false,
        }
    }
}

#[derive(serde::Serialize)]
struct TotalState<'a> {
    server: ServerState,
    waxs: Option<ClientResponse<'a>>,
    saxs: Option<ClientResponse<'a>>,
    errors: Vec<String>,
    warnings: Vec<String>,
}

#[derive(serde::Deserialize)]
struct ClientRequest {
    #[serde(default)]
    state: i32,
    #[serde(default)]
    kill: i32,
    #[serde(default)]
    waxs: Option<dispatcher::Settings>,
    #[serde(default)]
    saxs: Option<dispatcher::Settings>,
}

#[derive(serde::Serialize)]
struct ClientResponse<'a> {
    running: bool,
    path: &'a str,
    #[serde(rename = "imageFile")]
    last_frame: Option<&'a PathBuf>,
    #[serde(rename = "chiFile")]
    last_dat: Option<&'a PathBuf>,
    pattern: &'a str,
    #[serde(rename = "patternSize")]
    pattern_size: usize,
    timestamp: f64,
    frame: &'a str,
    #[serde(rename = "frameSize")]
    frame_size: usize,
    #[serde(rename = "all")]
    total: isize,
    #[serde(rename = "total")]
    done: isize,
    transmission: f64,
}