#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate clap;
#[macro_use]
extern crate prometheus;
pub mod metrics;
use clap::App;
pub const ASTER_VERSION: &str = env!("CARGO_PKG_VERSION");
pub mod com;
pub mod protocol;
pub mod proxy;
pub(crate) mod utils;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::env;
use failure::Error;
use com::meta::{load_meta, meta_init};
use com::ClusterConfig;
use metrics::thread_incr;
pub fn run() -> Result<(), Error> {
let yaml = load_yaml!("cli.yml");
let matches = App::from_yaml(yaml).version(ASTER_VERSION).get_matches();
let config = matches.value_of("config").unwrap_or("default.toml");
let watch_file = config.to_string();
let ip = matches.value_of("ip").map(|x| x.to_string());
let enable_reload = matches.is_present("reload");
println!("[rcproxy-{}] loading config from {}", ASTER_VERSION, config);
let cfg = com::Config::load(&config)?;
assert!(
!cfg.clusters.is_empty(),
"clusters is absent of config file"
);
assert!(
!cfg.log.level.is_empty(),
"log level is absent of config file"
);
assert!(
!cfg.log.directory.is_empty(),
"log directory is absent of config file"
);
assert!(
!cfg.log.file_name.is_empty(),
"log file_name is absent of config file"
);
assert!(
cfg.metrics.port != 0,
"metrics port is absent of config file"
);
crate::proxy::standalone::reload::init(&watch_file, cfg.clone(), enable_reload)?;
let _guard = clia_tracing_config::build()
.filter_level(&cfg.log.level)
.with_ansi(cfg.log.ansi)
.to_stdout(cfg.log.stdout)
.directory(&cfg.log.directory)
.file_name(&cfg.log.file_name)
.rolling("daily")
.init();
let mut ths = Vec::new();
for cluster in cfg.clusters.into_iter() {
if cluster.servers.is_empty() {
warn!(
"fail to running cluster {} in addr {} due filed `servers` is empty",
cluster.name, cluster.listen_addr
);
continue;
}
if cluster.name.is_empty() {
warn!(
"fail to running cluster {} in addr {} due filed `name` is empty",
cluster.name, cluster.listen_addr
);
continue;
}
info!(
"starting aster cluster {} in addr {}",
cluster.name, cluster.listen_addr
);
match cluster.cache_type {
com::CacheType::RedisCluster => {
let jhs = spawn_worker(&cluster, ip.clone(), proxy::cluster::spawn);
ths.extend(jhs);
}
_ => {
let jhs = spawn_worker(&cluster, ip.clone(), proxy::standalone::spawn);
ths.extend(jhs);
}
}
}
{
let port = cfg.metrics.port;
spawn_metrics(port);
}
for th in ths {
th.join().unwrap();
}
Ok(())
}
fn spawn_worker<T>(cc: &ClusterConfig, ip: Option<String>, spawn_fn: T) -> Vec<JoinHandle<()>>
where
T: Fn(ClusterConfig) + Copy + Send + 'static,
{
let worker = cc.thread.unwrap_or(4);
let meta = load_meta(cc.clone(), ip);
info!("setup meta info with {:?}", meta);
(0..worker)
.map(|_index| {
let cc = cc.clone();
let meta = meta.clone();
Builder::new()
.name(cc.name.clone())
.spawn(move || {
thread_incr();
meta_init(meta);
spawn_fn(cc);
})
.expect("fail to spawn worker thread")
})
.collect()
}
fn spawn_metrics(port: usize) -> Vec<thread::JoinHandle<()>> {
thread::sleep(Duration::from_secs(3));
vec![
thread::Builder::new()
.name("aster-http-srv".to_string())
.spawn(move || metrics::init(port).unwrap())
.unwrap(),
thread::Builder::new()
.name("measure-service".to_string())
.spawn(move || metrics::measure_system().unwrap())
.unwrap(),
]
}