extern crate clap;
extern crate fibers_rpc;
extern crate frugalos;
extern crate frugalos_config;
extern crate frugalos_segment;
extern crate hostname;
extern crate jemallocator;
extern crate libfrugalos;
#[macro_use]
extern crate slog;
extern crate sloggers;
#[macro_use]
extern crate trackable;
use clap::{App, Arg, ArgMatches, SubCommand};
use libfrugalos::entity::server::Server;
use libfrugalos::time::Seconds;
use sloggers::Build;
use std::env;
use std::string::ToString;
use std::time::Duration;
use trackable::error::{ErrorKindExt, Failure};
use frugalos::command::rpc_addr;
use frugalos::command::set_repair_config::SetRepairConfigCommand;
use frugalos::command::truncate_bucket::TruncateBucketCommand;
use frugalos::command::FrugalosSubcommand;
use frugalos::FrugalosConfig;
use frugalos::{Error, ErrorKind, Result};
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[allow(
renamed_and_removed_lints,
clippy::cyclomatic_complexity,
clippy::cognitive_complexity
)]
fn main() {
let rpc_server_bind_addr = default_rpc_server_bind_addr();
let long_version = track_try_unwrap!(make_long_version());
let set_repair_config_command = SetRepairConfigCommand;
let truncate_bucket_command = TruncateBucketCommand;
let matches = App::new("frugalos")
.version(env!("CARGO_PKG_VERSION"))
.long_version(long_version.as_str())
.subcommand(
SubCommand::with_name("create")
.arg(server_id_arg())
.arg(server_addr_arg(&rpc_server_bind_addr))
.arg(data_dir_arg()),
)
.subcommand(
SubCommand::with_name("join")
.arg(server_id_arg())
.arg(server_addr_arg(&rpc_server_bind_addr))
.arg(contact_server_addr_arg())
.arg(data_dir_arg()),
)
.subcommand(
SubCommand::with_name("leave")
.arg(contact_server_addr_arg())
.arg(data_dir_arg()),
)
.subcommand(
SubCommand::with_name("repair-local-dat")
.arg(server_id_arg())
.arg(server_addr_arg(&rpc_server_bind_addr).required(true))
.arg(server_seqno_arg().required(true))
.arg(data_dir_arg()),
)
.subcommand(
SubCommand::with_name("start")
.arg(
Arg::with_name("SAMPLING_RATE")
.long("sampling-rate")
.takes_value(true),
)
.arg(
Arg::with_name("EXECUTOR_THREADS")
.long("threads")
.takes_value(true),
)
.arg(
Arg::with_name("HTTP_SERVER_BIND_ADDR")
.long("http-server-bind-addr")
.takes_value(true),
)
.arg(
Arg::with_name("RPC_CONNECT_TIMEOUT_MILLIS")
.long("rpc-connect-timeout-millis")
.takes_value(true),
)
.arg(
Arg::with_name("RPC_WRITE_TIMEOUT_MILLIS")
.long("rpc-write-timeout-millis")
.takes_value(true),
)
.arg(data_dir_arg())
.arg(put_content_timeout_arg()),
)
.subcommand(SubCommand::with_name("stop").arg(rpc_addr::get_arg()))
.subcommand(SubCommand::with_name("take-snapshot").arg(rpc_addr::get_arg()))
.subcommand(set_repair_config_command.get_subcommand())
.subcommand(truncate_bucket_command.get_subcommand())
.arg(
Arg::with_name("LOGLEVEL")
.short("l")
.long("loglevel")
.takes_value(true)
.possible_values(&["debug", "info", "warning", "error", "critical"]),
)
.arg(
Arg::with_name("MAX_CONCURRENT_LOGS")
.long("max_concurrent_logs")
.takes_value(true),
)
.arg(
Arg::with_name("CONFIG_FILE")
.long("config-file")
.takes_value(true),
)
.get_matches();
let (mut config, unknown_fields): (FrugalosConfig, Vec<String>) =
track_try_unwrap!(track_any_err!(get_frugalos_config(&matches)));
config.loglevel = matches
.value_of("LOGLEVEL")
.map(|v| match v {
"debug" => sloggers::types::Severity::Debug,
"info" => sloggers::types::Severity::Info,
"warning" => sloggers::types::Severity::Warning,
"error" => sloggers::types::Severity::Error,
"critical" => sloggers::types::Severity::Critical,
_ => unreachable!(),
})
.unwrap_or(config.loglevel);
if let Some(v) = matches.value_of("MAX_CONCURRENT_LOGS") {
config.max_concurrent_logs = track_try_unwrap!(v.parse().map_err(Error::from));
}
let logger_builder;
{
let maybe_log_file = config.log_file.as_ref().and_then(|p| p.to_str());
logger_builder = if let Some(filepath) = matches.value_of("LOGFILE").or(maybe_log_file) {
let mut builder = sloggers::file::FileLoggerBuilder::new(filepath);
builder.level(config.loglevel);
builder.channel_size(config.max_concurrent_logs);
sloggers::LoggerBuilder::File(builder)
} else {
let mut builder = sloggers::terminal::TerminalLoggerBuilder::new();
builder.level(config.loglevel);
builder.channel_size(config.max_concurrent_logs);
sloggers::LoggerBuilder::Terminal(builder)
};
}
if let Some(matches) = matches.subcommand_matches("create") {
let server_id = matches
.value_of("SERVER_ID")
.map(ToString::to_string)
.or_else(hostname::get_hostname)
.unwrap();
let server_addr = matches.value_of("SERVER_ADDR").unwrap();
set_data_dir(&matches, &mut config);
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
let logger = logger.new(o!("server" => format!("{}@{}", server_id, server_addr)));
let server = Server::new(
server_id,
track_try_unwrap!(server_addr.parse().map_err(Failure::from_error)),
);
debug!(logger, "config: {:?}", config);
track_try_unwrap!(frugalos_config::cluster::create(
&logger,
server,
config.data_dir
));
} else if let Some(matches) = matches.subcommand_matches("join") {
let server_id = matches
.value_of("SERVER_ID")
.map(ToString::to_string)
.or_else(hostname::get_hostname)
.unwrap();
let server_addr = matches.value_of("SERVER_ADDR").unwrap();
let contact_server_addr = matches.value_of("CONTACT_SERVER_ADDR").unwrap();
set_data_dir(&matches, &mut config);
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
let logger = logger.new(o!("server" => format!("{}@{}", server_id, server_addr)));
let server = Server::new(
server_id,
track_try_unwrap!(server_addr.parse().map_err(Failure::from_error)),
);
let contact_server =
track_try_unwrap!(contact_server_addr.parse().map_err(Failure::from_error));
debug!(logger, "config: {:?}", config);
track_try_unwrap!(frugalos_config::cluster::join(
&logger,
&server,
config.data_dir,
contact_server,
));
} else if let Some(matches) = matches.subcommand_matches("leave") {
let contact_server_addr = matches.value_of("CONTACT_SERVER_ADDR").unwrap();
set_data_dir(&matches, &mut config);
let contact_server =
track_try_unwrap!(contact_server_addr.parse().map_err(Failure::from_error));
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
debug!(logger, "config: {:?}", config);
track_try_unwrap!(frugalos_config::cluster::leave(
&logger,
config.data_dir,
contact_server,
));
} else if let Some(matches) = matches.subcommand_matches("repair-local-dat") {
let server_id = matches
.value_of("SERVER_ID")
.map(ToString::to_string)
.or_else(hostname::get_hostname)
.unwrap();
let server_addr = matches.value_of("SERVER_ADDR").unwrap();
set_data_dir(&matches, &mut config);
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
let logger = logger.new(o!("server" => format!("{}@{}", server_id, server_addr)));
let mut server = Server::new(
server_id,
track_try_unwrap!(server_addr.parse().map_err(Failure::from_error)),
);
server.seqno = track_try_unwrap!(get_server_seqno(matches));
debug!(logger, "config: {:?}", config);
track_try_unwrap!(frugalos_config::cluster::save_local_server_info(
config.data_dir,
server,
));
} else if let Some(matches) = matches.subcommand_matches("start") {
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
set_data_dir(&matches, &mut config);
track_try_unwrap!(track_any_err!(set_daemon_config(
&matches,
&mut config.daemon
)));
track_try_unwrap!(track_any_err!(set_http_server_config(
&matches,
&mut config.http_server
)));
track_try_unwrap!(track_any_err!(set_rpc_client_config(
&matches,
&mut config.rpc_client
)));
track_try_unwrap!(track_any_err!(set_segment_config(
&matches,
&mut config.segment
)));
let daemon = track_try_unwrap!(frugalos::daemon::FrugalosDaemon::new(
&logger,
config.clone()
));
track_try_unwrap!(daemon.run());
std::thread::sleep(std::time::Duration::from_millis(100));
debug!(logger, "config: {:?}", config);
} else if let Some(matches) = matches.subcommand_matches("stop") {
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
let rpc_addr = rpc_addr::from_matches(&matches);
let logger = logger.new(o!("rpc_addr" => rpc_addr.to_string()));
track_try_unwrap!(frugalos::daemon::stop(&logger, rpc_addr));
std::thread::sleep(std::time::Duration::from_millis(100));
debug!(logger, "config: {:?}", config);
} else if let Some(matches) = matches.subcommand_matches("take-snapshot") {
let mut logger = track_try_unwrap!(logger_builder.build());
warn_if_there_are_unknown_fields(&mut logger, &unknown_fields);
let rpc_addr = rpc_addr::from_matches(&matches);
let logger = logger.new(o!("rpc_addr" => rpc_addr.to_string()));
track_try_unwrap!(frugalos::daemon::take_snapshot(&logger, rpc_addr));
std::thread::sleep(std::time::Duration::from_millis(100));
debug!(logger, "config: {:?}", config);
} else if let Some(matches) = set_repair_config_command.check_matches(&matches) {
set_repair_config_command.handle_matches(logger_builder, matches, &unknown_fields);
} else if let Some(matches) = truncate_bucket_command.check_matches(&matches) {
truncate_bucket_command.handle_matches(logger_builder, matches, &unknown_fields);
} else {
println!("Usage: {}", matches.usage());
std::process::exit(1);
}
}
fn server_id_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("SERVER_ID")
.help("Sets the identifier of this server (the default is the hostname of this machine)")
.long("id")
.takes_value(true)
}
fn server_addr_arg<'a, 'b>(default_value: &'a str) -> Arg<'a, 'b> {
Arg::with_name("SERVER_ADDR")
.long("addr")
.alias("rpc-addr")
.takes_value(true)
.default_value(default_value)
}
fn server_seqno_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("SERVER_SEQNO")
.help("seqno of this server")
.long("seqno")
.takes_value(true)
}
fn contact_server_addr_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("CONTACT_SERVER_ADDR")
.long("contact-server")
.takes_value(true)
.required(true)
}
fn data_dir_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("DATA_DIR")
.help(
"Sets the data directory of this server \
(the default is the value of FRUGALOS_DATA_DIR environment variable)",
)
.long("data-dir")
.takes_value(true)
}
fn put_content_timeout_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("PUT_CONTENT_TIMEOUT")
.help("Sets timeout in seconds on putting a content.")
.long("put-content-timeout")
.takes_value(true)
}
fn default_rpc_server_bind_addr() -> String {
frugalos::command::rpc_addr::default_rpc_server_bind_addr().to_owned()
}
fn get_server_seqno(matches: &ArgMatches) -> Result<u32> {
matches
.value_of("SERVER_SEQNO")
.map(|v| v.parse::<u32>().map_err(|e| track!(Error::from(e))))
.unwrap_or_else(|| {
Err(Error::from(
ErrorKind::InvalidInput.cause("server seqno must be specified"),
))
})
}
fn set_data_dir(matches: &ArgMatches, config: &mut FrugalosConfig) {
if let Some(value) = matches
.value_of("DATA_DIR")
.map(ToString::to_string)
.or_else(|| env::var("FRUGALOS_DATA_DIR").ok())
{
config.data_dir = value;
}
if config.data_dir.is_empty() {
println!(
"[ERROR] Must set one of the `data-dir` argument, the `FRUGALOS_DATA_DIR` environment variable or the `frugalos.data_dir` key of a configuration file"
);
std::process::exit(1);
}
}
fn get_frugalos_config(matches: &ArgMatches) -> Result<(FrugalosConfig, Vec<String>)> {
matches.value_of("CONFIG_FILE").map_or_else(
|| Ok((FrugalosConfig::default(), Vec::new())),
|v| FrugalosConfig::from_yaml(v).map_err(|e| track!(e)),
)
}
fn set_daemon_config(
matches: &ArgMatches,
config: &mut frugalos::FrugalosDaemonConfig,
) -> Result<()> {
if let Some(threads) = matches.value_of("EXECUTOR_THREADS") {
config.executor_threads = threads.parse().map_err(|e| track!(Error::from(e)))?;
}
if let Some(v) = matches.value_of("SAMPLING_RATE") {
config.sampling_rate = v.parse().map_err(|e| track!(Error::from(e)))?;
}
Ok(())
}
fn set_http_server_config(
matches: &ArgMatches,
config: &mut frugalos::FrugalosHttpServerConfig,
) -> Result<()> {
if let Some(v) = matches.value_of("HTTP_SERVER_BIND_ADDR") {
config.bind_addr = v.parse().map_err(|e| track!(Error::from(e)))?;
}
Ok(())
}
fn set_rpc_client_config(
matches: &ArgMatches,
config: &mut frugalos::FrugalosRpcClientConfig,
) -> Result<()> {
if let Some(v) = matches.value_of("RPC_CONNECT_TIMEOUT_MILLIS") {
config.tcp_connect_timeout = v
.parse::<u64>()
.map(Duration::from_millis)
.map_err(|e| track!(Error::from(e)))?;
}
if let Some(v) = matches.value_of("RPC_WRITE_TIMEOUT_MILLIS") {
config.tcp_write_timeout = v
.parse::<u64>()
.map(Duration::from_millis)
.map_err(|e| track!(Error::from(e)))?;
}
Ok(())
}
fn set_segment_config(
matches: &ArgMatches,
config: &mut frugalos_segment::FrugalosSegmentConfig,
) -> Result<()> {
if let Some(v) = matches.value_of("PUT_CONTENT_TIMEOUT") {
config.mds_client.put_content_timeout = v
.parse::<u64>()
.map(Seconds)
.map_err(|e| track!(Error::from(e)))?;
}
Ok(())
}
fn warn_if_there_are_unknown_fields(logger: &mut slog::Logger, unknown_fields: &[String]) {
frugalos::command::warn_if_there_are_unknown_fields(logger, unknown_fields);
}
fn make_long_version() -> Result<String> {
use frugalos::build_information::*;
use std::fmt::Write;
let mut s = String::new();
track!(writeln!(&mut s, env!("CARGO_PKG_VERSION")).map_err(Error::from))?;
track!(writeln!(&mut s, "build-mode: {}", BUILD_PROFILE).map_err(Error::from))?;
track!(writeln!(&mut s, "rustc-version: {}", BUILD_VERSION).map_err(Error::from))?;
Ok(s.trim().to_owned())
}