libaster/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// #![deny(warnings)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate clap;

#[macro_use]
extern crate prometheus;

pub mod metrics;

use clap::App;

pub const ASTER_VERSION: &str = env!("CARGO_PKG_VERSION");

pub mod com;
pub mod protocol;
pub mod proxy;
pub(crate) mod utils;

use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::io::Write;

use failure::Error;
use chrono::Local;

use com::meta::{load_meta, meta_init};
use com::ClusterConfig;
use metrics::thread_incr;

pub fn run() -> Result<(), Error> {
    env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info");
    env_logger::builder()
        .format(|buf, r| {
            writeln!(
                buf,
                "{} {} [{}:{}] {}",
                Local::now().format("%Y-%m-%d %H:%M:%S"),
                r.level(),
                r.module_path().unwrap_or("<unnamed>"),
                r.line().unwrap_or(0),
                r.args(),
            )
        })
        .init();

    let yaml = load_yaml!("cli.yml");
    let matches = App::from_yaml(yaml).version(ASTER_VERSION).get_matches();
    let config = matches.value_of("config").unwrap_or("default.toml");
    let watch_file = config.to_string();
    let ip = matches.value_of("ip").map(|x| x.to_string());
    let enable_reload = matches.is_present("reload");
    info!("[aster-{}] loading config from {}", ASTER_VERSION, config);
    let cfg = com::Config::load(&config)?;
    debug!("use config : {:?}", cfg);
    assert!(
        !cfg.clusters.is_empty(),
        "clusters is absent of config file"
    );
    crate::proxy::standalone::reload::init(&watch_file, cfg.clone(), enable_reload)?;

    let mut ths = Vec::new();
    for cluster in cfg.clusters.into_iter() {
        if cluster.servers.is_empty() {
            warn!(
                "fail to running cluster {} in addr {} due filed `servers` is empty",
                cluster.name, cluster.listen_addr
            );
            continue;
        }

        if cluster.name.is_empty() {
            warn!(
                "fail to running cluster {} in addr {} due filed `name` is empty",
                cluster.name, cluster.listen_addr
            );
            continue;
        }

        info!(
            "starting aster cluster {} in addr {}",
            cluster.name, cluster.listen_addr
        );

        match cluster.cache_type {
            com::CacheType::RedisCluster => {
                let jhs = spawn_worker(&cluster, ip.clone(), proxy::cluster::spawn);
                ths.extend(jhs);
            }
            _ => {
                let jhs = spawn_worker(&cluster, ip.clone(), proxy::standalone::spawn);
                ths.extend(jhs);
            }
        }
    }

    {
        let port_str = matches.value_of("metrics").unwrap_or("2110");
        let port = port_str.parse::<usize>().unwrap_or(2110);
        spawn_metrics(port);
    }

    for th in ths {
        th.join().unwrap();
    }
    Ok(())
}

fn spawn_worker<T>(cc: &ClusterConfig, ip: Option<String>, spawn_fn: T) -> Vec<JoinHandle<()>>
where
    T: Fn(ClusterConfig) + Copy + Send + 'static,
{
    let worker = cc.thread.unwrap_or(4);
    let meta = load_meta(cc.clone(), ip);
    info!("setup meta info with {:?}", meta);
    (0..worker)
        .map(|_index| {
            let cc = cc.clone();
            let meta = meta.clone();
            Builder::new()
                .name(cc.name.clone())
                .spawn(move || {
                    thread_incr();
                    meta_init(meta);
                    spawn_fn(cc);
                })
                .expect("fail to spawn worker thread")
        })
        .collect()
}

fn spawn_metrics(port: usize) -> Vec<thread::JoinHandle<()>> {
    // wait for worker thread to be ready
    thread::sleep(Duration::from_secs(3));
    vec![
        thread::Builder::new()
            .name("aster-http-srv".to_string())
            .spawn(move || metrics::init(port).unwrap())
            .unwrap(),
        thread::Builder::new()
            .name("measure-service".to_string())
            .spawn(move || metrics::measure_system().unwrap())
            .unwrap(),
    ]
}