#[macro_use]
extern crate clap;
#[macro_use]
extern crate crossbeam_channel;
#[macro_use]
extern crate log;
use crate::dispatcher::{Dispatcher, ISettings, Response};
use crate::server::Server;
use crate::utils::py_stamp;
pub use crossbeam_channel::select;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
#[cfg(not(target_os = "windows"))]
use daemonize::Daemonize;
use rayon::ThreadPoolBuilder;
#[cfg(not(target_os = "windows"))]
use std::fs;
#[cfg(not(target_os = "windows"))]
use std::fs::File;
#[cfg(not(target_os = "windows"))]
use std::path::Path;
use std::sync::Arc;
use std::{fmt, thread};
pub mod dispatcher;
pub mod notifier;
pub mod server;
pub mod utils;
#[macro_use]
pub mod macros;
pub mod averager;
const CURRENT_YEAR: u16 = 2020;
const PORT: u16 = 8587;
#[cfg(not(target_os = "linux"))]
const RESIZE: usize = 4;
#[cfg(target_os = "linux")]
const RESIZE: usize = 1;
const VERSION: &'static str = env!("CARGO_PKG_VERSION");
const AUTHORS: &'static str = env!("CARGO_PKG_AUTHORS");
const DESCRIPTION: &'static str = env!("CARGO_PKG_DESCRIPTION");
const HOMEPAGE: &'static str = env!("CARGO_PKG_HOMEPAGE");
const REPOSITORY: &'static str = env!("CARGO_PKG_REPOSITORY");
const NAME: &'static str = env!("CARGO_PKG_NAME");
const VERBOSE_ALL: &'static str = "3";
const VERBOSE_INFO: &'static str = "2";
const VERBOSE_ERRORS: &'static str = "1";
const VERBOSE_SILENT: &'static str = "0";
struct Logger {
daemon: bool,
level: log::LevelFilter,
}
static mut LOGGER: Logger = Logger {
daemon: false,
level: log::LevelFilter::Info,
};
impl Logger {
fn now(&self) -> String {
if self.daemon {
chrono::Local::now()
.format("%Y-%m-%d %H:%M:%S: ")
.to_string()
} else {
String::new()
}
}
fn init(p: &Params) -> Result<(), log::SetLoggerError> {
unsafe {
LOGGER.daemon = p.daemon;
LOGGER.level = p.log_level;
log::set_logger(&LOGGER).map(|()| log::set_max_level(p.log_level))
}
}
}
impl log::Log for Logger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &log::Record) {
if self.enabled(record.metadata()) {
match record.level() {
log::Level::Warn | log::Level::Error => {
eprintln!("{}{}: {}", self.now(), record.level(), record.args())
}
_ => println!("{}{}: {}", self.now(), record.level(), record.args()),
}
}
}
fn flush(&self) {}
}
fn main() {
let mut params = Params::from_command_line();
if params.threads != num_cpus::get() {
if let Err(e) = ThreadPoolBuilder::new()
.num_threads(params.threads)
.build_global()
{
params.threads = num_cpus::get();
error!(
"Failed to initialize the user-specified number of threads: {}",
e
);
}
}
let params = Arc::new(params);
info!("{} thread(s) will be used for integration", params.threads);
let ctrlc_exit = ctrl_channel();
let (server, saxs, waxs, server_exit) = channels();
let p = params.clone();
thread::spawn(move || server.run(p));
thread::spawn(move || waxs.run(IType::WAXS));
thread::spawn(move || saxs.run(IType::SAXS));
loop {
select! {
recv(ctrlc_exit) -> _ => {
info!("Ctrl-C has been pressed. Exiting...");
break;
}
recv(server_exit) -> _ => {
info!("Client requested immediate exit");
break;
}
}
}
params.remove_pid_file();
}
fn about() -> String {
format!(
"is {}, version {}\n\
(c) {} 2014-{}, SNBL@ESRF, inspired by Giuseppe Portale\n\
If this program proves to be useful, please cite this paper:\n\
http://dx.doi.org/10.1107/S1600577516002411\n\
Official web page: {}\n\
Git repository for bubbles: {}",
DESCRIPTION, VERSION, AUTHORS, CURRENT_YEAR, HOMEPAGE, REPOSITORY
)
}
fn ctrl_channel() -> Receiver<()> {
let (sender, receiver) = bounded(100);
if let Err(e) = ctrlc::set_handler(move || {
let _ = sender.send(());
}) {
die!("Error setting Ctrl-C handler: {:?}", e);
};
receiver
}
pub struct ChServer {
pub ch_req_saxs_state: Sender<Sender<Response>>,
pub ch_req_waxs_state: Sender<Sender<Response>>,
pub ch_send_saxs_settings: Sender<ISettings>,
pub ch_send_waxs_settings: Sender<ISettings>,
pub ch_send_exit: Sender<()>,
}
impl ChServer {
pub fn clone(&self) -> ChServer {
ChServer {
ch_req_saxs_state: self.ch_req_saxs_state.clone(),
ch_req_waxs_state: self.ch_req_waxs_state.clone(),
ch_send_saxs_settings: self.ch_send_saxs_settings.clone(),
ch_send_waxs_settings: self.ch_send_waxs_settings.clone(),
ch_send_exit: self.ch_send_exit.clone(),
}
}
}
pub struct ChDispatcher {
pub ch_wait_req_state: Receiver<Sender<Response>>,
pub ch_recv_settings: Receiver<ISettings>,
}
fn channels() -> (ChServer, ChDispatcher, ChDispatcher, Receiver<()>) {
let (ch_req_saxs_state, ch_wait_req_saxs_state) = unbounded();
let (ch_req_waxs_state, ch_wait_req_waxs_state) = unbounded();
let (ch_send_saxs_settings, ch_recv_saxs_settings) = unbounded();
let (ch_send_waxs_settings, ch_recv_waxs_settings) = unbounded();
let (ch_send_exit, ch_recv_exit) = unbounded();
let s = ChServer {
ch_req_saxs_state,
ch_req_waxs_state,
ch_send_saxs_settings,
ch_send_waxs_settings,
ch_send_exit,
};
let saxs = ChDispatcher {
ch_wait_req_state: ch_wait_req_saxs_state,
ch_recv_settings: ch_recv_saxs_settings,
};
let waxs = ChDispatcher {
ch_wait_req_state: ch_wait_req_waxs_state,
ch_recv_settings: ch_recv_waxs_settings,
};
(s, saxs, waxs, ch_recv_exit)
}
#[derive(PartialEq)]
pub enum IType {
SAXS,
WAXS,
}
impl fmt::Display for IType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IType::SAXS => write!(f, "SAXS"),
IType::WAXS => write!(f, "WAXS"),
}
}
}
pub struct Params {
pub started: f64,
pub port: u16,
pub resize: usize,
threads: usize,
#[cfg(not(target_os = "windows"))]
stdout: String,
#[cfg(not(target_os = "windows"))]
stderr: String,
#[cfg(not(target_os = "windows"))]
pid: String,
#[cfg(not(target_os = "windows"))]
user: String,
#[cfg(not(target_os = "windows"))]
group: String,
#[cfg(not(target_os = "windows"))]
workdir: String,
log_level: log::LevelFilter,
daemon: bool,
}
impl Params {
#[cfg(target_os = "windows")]
fn from_command_line() -> Params {
let port = PORT.to_string();
let resize = RESIZE.to_string();
let about = about();
let threads = num_cpus::get().to_string();
let app = AppConfig {
port: &port,
resize: &resize,
threads: &threads,
};
let matches = app.get().about(about.as_ref()).get_matches();
let p = Params {
started: py_stamp(),
port: value_t!(matches.value_of("port"), u16).unwrap_or_else(|e| e.exit()),
resize: value_t!(matches.value_of("resize"), usize).unwrap_or_else(|e| e.exit()),
threads: value_t!(matches.value_of("threads"), usize).unwrap_or_else(|e| e.exit()),
log_level: match matches
.value_of("verbose")
.unwrap_or_else(|| std::process::exit(1))
{
VERBOSE_ERRORS => log::LevelFilter::Error,
VERBOSE_ALL => log::LevelFilter::Debug,
VERBOSE_SILENT => log::LevelFilter::Off,
VERBOSE_INFO => log::LevelFilter::Info,
_ => log::LevelFilter::Debug,
},
daemon: false,
};
if let Err(e) = Logger::init(&p) {
die!("Could not set logger: {}", e);
};
println!("{} {}", NAME, about);
p
}
#[cfg(not(target_os = "windows"))]
fn remove_pid_file(&self) {
let _ = fs::remove_file(&self.pid);
}
#[cfg(target_os = "windows")]
fn remove_pid_file(&self) {}
#[cfg(not(target_os = "windows"))]
fn from_command_line() -> Params {
let user = whoami::username();
let port = PORT.to_string();
let resize = RESIZE.to_string();
let about = about();
let threads = num_cpus::get().to_string();
let app = AppConfig {
port: &port,
resize: &resize,
threads: &threads,
};
let matches = app
.get()
.about(about.as_ref())
.arg(
clap::Arg::with_name("stdout")
.short("o")
.long("out")
.value_name("LOGFILE")
.help("Log file")
.takes_value(true),
)
.arg(
clap::Arg::with_name("stderr")
.short("e")
.long("err")
.value_name("LOGFILE")
.help("Log file for errors")
.takes_value(true),
)
.arg(
clap::Arg::with_name("pid")
.short("i")
.long("pid")
.value_name("PIDFILE")
.help("pidfile path")
.takes_value(true),
)
.arg(
clap::Arg::with_name("user")
.short("u")
.long("user")
.value_name("USER")
.help("A username to run daemon")
.default_value(&user)
.takes_value(true),
)
.arg(
clap::Arg::with_name("group")
.short("g")
.long("group")
.value_name("GROUP")
.help("A group to run daemon")
.default_value(&user)
.takes_value(true),
)
.arg(
clap::Arg::with_name("workdir")
.short("w")
.long("workdir")
.value_name("WORKING DIRECTORY")
.help("Working directory for the daemon")
.takes_value(true),
)
.arg(
clap::Arg::with_name("daemon")
.short("d")
.long("daemon")
.help("Run process in the background"),
)
.get_matches();
let p = Params {
port: value_t!(matches.value_of("port"), u16).unwrap_or_else(|e| e.exit()),
resize: value_t!(matches.value_of("resize"), usize).unwrap_or_else(|e| e.exit()),
threads: value_t!(matches.value_of("threads"), usize).unwrap_or_else(|e| e.exit()),
stdout: if let Some(v) = matches.value_of("stdout") {
v.to_string()
} else {
String::new()
},
stderr: if let Some(v) = matches.value_of("stderr") {
v.to_string()
} else {
String::new()
},
pid: if let Some(v) = matches.value_of("pid") {
v.to_string()
} else {
String::new()
},
user: if let Some(v) = matches.value_of("user") {
v.to_string()
} else {
String::new()
},
group: if let Some(v) = matches.value_of("group") {
v.to_string()
} else {
String::new()
},
workdir: if let Some(v) = matches.value_of("workdir") {
v.to_string()
} else {
String::new()
},
started: py_stamp(),
log_level: match matches
.value_of("verbose")
.unwrap_or_else(|| std::process::exit(1))
{
VERBOSE_ERRORS => log::LevelFilter::Error,
VERBOSE_ALL => log::LevelFilter::Debug,
VERBOSE_SILENT => log::LevelFilter::Off,
VERBOSE_INFO => log::LevelFilter::Info,
_ => log::LevelFilter::Debug,
},
daemon: matches.is_present("daemon"),
};
if let Err(e) = Logger::init(&p) {
die!("Could not set logger: {}", e);
}
p.daemonize();
println!("{} {}", NAME, about);
p
}
#[cfg(not(target_os = "windows"))]
fn daemonize(&self) {
if !self.daemon {
return;
}
let mut daemon = Daemonize::new();
if let Some(out) = self.stdout.create() {
daemon = daemon.stdout(out);
}
if self.stderr.is_empty() {
if !self.stdout.is_empty() {
if let Some(out) = self.stdout.create() {
daemon = daemon.stderr(out);
}
}
} else {
if let Some(err) = self.stderr.create() {
daemon = daemon.stderr(err);
}
}
if !self.pid.is_empty() {
daemon = daemon.pid_file(&self.pid);
}
if !self.workdir.is_empty() {
daemon = daemon.working_directory(&self.workdir);
}
if !self.user.is_empty() {
daemon = daemon.user(self.user.as_ref());
}
if !self.group.is_empty() {
daemon = daemon.group(self.group.as_ref());
}
match daemon.start() {
Ok(_) => info!("The main procces has forked successfully"),
Err(e) => die!("Could not daemonize: {}", e),
}
}
}
#[cfg(not(target_os = "windows"))]
trait FileCreator: AsRef<Path> {
fn create(&self) -> Option<fs::File>;
}
#[cfg(not(target_os = "windows"))]
impl FileCreator for str {
fn create(&self) -> Option<File> {
if !self.is_empty() {
match fs::OpenOptions::new().append(true).create(true).open(self) {
Ok(out) => Some(out),
Err(e) => {
error!("Failed to open file {}: {}", self, e);
None
}
}
} else {
None
}
}
}
struct AppConfig<'a> {
port: &'a str,
resize: &'a str,
threads: &'a str,
}
impl<'a> AppConfig<'a> {
fn get<'b>(&self) -> clap::App<'a, 'b> {
clap::App::new(NAME)
.arg(
clap::Arg::with_name("port")
.value_name("PORT")
.help("TCP/IP port to listen for client connections")
.default_value(self.port)
.takes_value(true),
)
.arg(
clap::Arg::with_name("resize")
.short("r")
.long("resize")
.value_name("PIXELS")
.help("Resize and interpolate image by PIXELS before sending to client")
.default_value(self.resize)
.takes_value(true),
)
.arg(
clap::Arg::with_name("verbose")
.short("v")
.long("verbose")
.value_name("VERBOSITY LEVEL")
.help("Verbosity level for messages: 0 - off; 1 - errors; 2 - info; 3 - all")
.default_value(VERBOSE_INFO)
.takes_value(true),
)
.arg(
clap::Arg::with_name("threads")
.short("t")
.long("threads")
.value_name("THREAD NUMBER")
.help("Number of threads which are used for integration")
.default_value(self.threads)
.takes_value(true),
)
}
}