Skip to main content

libaster/
lib.rs

1// #![deny(warnings)]
2#[macro_use]
3extern crate log;
4#[macro_use]
5extern crate lazy_static;
6#[macro_use]
7extern crate serde_derive;
8#[macro_use]
9extern crate failure;
10#[macro_use]
11extern crate clap;
12
13#[cfg(feature = "metrics")]
14#[macro_use]
15extern crate prometheus;
16#[cfg(feature = "metrics")]
17pub mod metrics;
18
19use clap::App;
20
21pub const ASTER_VERSION: &str = env!("CARGO_PKG_VERSION");
22
23pub mod com;
24pub mod protocol;
25pub mod proxy;
26pub(crate) mod utils;
27
28use failure::Error;
29
30pub fn run() -> Result<(), Error> {
31    env_logger::init();
32    let yaml = load_yaml!("cli.yml");
33    let matches = App::from_yaml(yaml).get_matches();
34    let config = matches.value_of("config").unwrap_or("default.toml");
35    let watch_file = config.clone();
36    let ip = matches.value_of("ip").map(|x| x.to_string());
37    let enable_reload = matches.is_present("reload");
38    info!("[aster] loading config from {}", config);
39    let cfg = com::Config::load(&config)?;
40    debug!("use config : {:?}", cfg);
41    assert!(
42        !cfg.clusters.is_empty(),
43        "clusters is absent of config file"
44    );
45    crate::proxy::standalone::reload::init(&watch_file, cfg.clone(), enable_reload)?;
46
47    let mut ths = Vec::new();
48    for cluster in cfg.clusters.clone().into_iter() {
49        if cluster.servers.is_empty() {
50            warn!(
51                "fail to running cluster {} in addr {} due filed `servers` is empty",
52                cluster.name, cluster.listen_addr
53            );
54            continue;
55        }
56
57        if cluster.name.is_empty() {
58            warn!(
59                "fail to running cluster {} in addr {} due filed `name` is empty",
60                cluster.name, cluster.listen_addr
61            );
62            continue;
63        }
64
65        info!(
66            "starting aster cluster {} in addr {}",
67            cluster.name, cluster.listen_addr
68        );
69
70        match cluster.cache_type {
71            com::CacheType::RedisCluster => {
72                let jhs = proxy::cluster::run(cluster, ip.clone());
73                ths.extend(jhs);
74            }
75            _ => {
76                let jhs = proxy::standalone::run(cluster, ip.clone());
77                ths.extend(jhs);
78            }
79        }
80    }
81
82    #[cfg(feature = "metrics")]
83    {
84        let port_str = matches.value_of("metrics").unwrap_or("2110");
85        let port = port_str.parse::<usize>().unwrap_or(2110);
86        spawn_metrics(port);
87    }
88
89    for th in ths {
90        th.join().unwrap();
91    }
92    Ok(())
93}
94
95#[cfg(feature = "metrics")]
96use std::thread;
97#[cfg(feature = "metrics")]
98fn spawn_metrics(port: usize) -> Vec<thread::JoinHandle<()>> {
99    vec![
100        thread::Builder::new()
101            .name("aster-http-srv".to_string())
102            .spawn(move || metrics::init(port).unwrap())
103            .unwrap(),
104        thread::Builder::new()
105            .name("measure-service".to_string())
106            .spawn(move || metrics::measure_system().unwrap())
107            .unwrap(),
108    ]
109}