pub mod logger;
use kompact::{
net::buffers::BufferConfig,
prelude::{DeadletterBox, KompactConfig, NetworkConfig},
};
use logger::{file_logger, term_logger, ArconLogger, LoggerType};
use std::path::PathBuf;
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub enum ExecutionMode {
Local,
Distributed(DistributedConf),
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub struct DistributedConf {
peers: Vec<String>, }
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub struct ApplicationConf {
#[cfg_attr(feature = "serde", serde(default = "execution_mode_default"))]
pub execution_mode: ExecutionMode,
#[cfg_attr(feature = "serde", serde(default = "base_dir_default"))]
pub base_dir: PathBuf,
#[cfg_attr(feature = "serde", serde(default))]
pub arcon_logger_type: LoggerType,
#[cfg_attr(feature = "serde", serde(default))]
pub kompact_logger_type: LoggerType,
#[cfg_attr(feature = "serde", serde(default = "epoch_interval_default"))]
pub epoch_interval: u64,
#[cfg_attr(feature = "serde", serde(default = "watermark_interval_default"))]
pub watermark_interval: u64,
#[cfg_attr(feature = "serde", serde(default = "max_key_default"))]
pub max_key: u64,
#[cfg_attr(feature = "serde", serde(default = "node_metrics_interval_default"))]
pub node_metrics_interval: u64,
#[cfg_attr(feature = "serde", serde(default = "buffer_pool_size_default"))]
pub buffer_pool_size: usize,
#[cfg_attr(feature = "serde", serde(default = "buffer_pool_limit_default"))]
pub buffer_pool_limit: usize,
#[cfg_attr(feature = "serde", serde(default = "channel_batch_size_default"))]
pub channel_batch_size: usize,
#[cfg_attr(feature = "serde", serde(default = "allocator_capacity_default"))]
pub allocator_capacity: usize,
#[cfg_attr(feature = "serde", serde(default = "ctrl_system_host_default"))]
pub ctrl_system_host: Option<String>,
#[cfg_attr(feature = "serde", serde(default = "kompact_threads_default"))]
pub kompact_threads: usize,
#[cfg_attr(feature = "serde", serde(default = "kompact_throughput_default"))]
pub kompact_throughput: usize,
#[cfg_attr(feature = "serde", serde(default = "kompact_msg_priority_default"))]
pub kompact_msg_priority: f32,
#[cfg_attr(feature = "serde", serde(default = "kompact_network_host_default"))]
pub kompact_network_host: Option<String>,
#[cfg_attr(feature = "serde", serde(default = "kompact_chunk_size_default"))]
pub kompact_chunk_size: usize,
#[cfg_attr(
feature = "serde",
serde(default = "kompact_initial_chunk_count_default")
)]
pub kompact_initial_chunk_count: usize,
#[cfg_attr(feature = "serde", serde(default = "kompact_max_chunk_count_default"))]
pub kompact_max_chunk_count: usize,
#[cfg_attr(
feature = "serde",
serde(default = "kompact_encode_buf_min_free_space_default")
)]
pub kompact_encode_buf_min_free_space: usize,
}
impl Default for ApplicationConf {
fn default() -> Self {
ApplicationConf {
execution_mode: execution_mode_default(),
base_dir: base_dir_default(),
arcon_logger_type: Default::default(),
kompact_logger_type: Default::default(),
watermark_interval: watermark_interval_default(),
epoch_interval: epoch_interval_default(),
max_key: max_key_default(),
node_metrics_interval: node_metrics_interval_default(),
buffer_pool_size: buffer_pool_size_default(),
buffer_pool_limit: buffer_pool_limit_default(),
channel_batch_size: channel_batch_size_default(),
allocator_capacity: allocator_capacity_default(),
ctrl_system_host: ctrl_system_host_default(),
kompact_threads: kompact_threads_default(),
kompact_throughput: kompact_throughput_default(),
kompact_msg_priority: kompact_msg_priority_default(),
kompact_network_host: kompact_network_host_default(),
kompact_chunk_size: kompact_chunk_size_default(),
kompact_max_chunk_count: kompact_max_chunk_count_default(),
kompact_initial_chunk_count: kompact_initial_chunk_count_default(),
kompact_encode_buf_min_free_space: kompact_encode_buf_min_free_space_default(),
}
}
}
impl ApplicationConf {
pub fn state_dir(&self) -> PathBuf {
let mut buf = self.base_dir.clone();
buf.push("live_states");
buf
}
pub fn checkpoints_dir(&self) -> PathBuf {
let mut buf = self.base_dir.clone();
buf.push("checkpoints");
buf
}
pub fn arcon_logger(&self) -> ArconLogger {
match self.arcon_logger_type {
LoggerType::File => {
let base_dir = self.base_dir.clone();
let path = format!(
"{}/{}",
base_dir.as_path().to_string_lossy(),
logger::ARCON_LOG_NAME
);
file_logger(&path)
}
LoggerType::Terminal => term_logger(),
}
}
fn kompact_logger(&self) -> Option<kompact::KompactLogger> {
match self.kompact_logger_type {
LoggerType::File => {
let base_dir = self.base_dir.clone();
let path = format!(
"{}/{}",
base_dir.as_path().to_string_lossy(),
logger::KOMPACT_LOG_NAME,
);
Some(file_logger(&path))
}
LoggerType::Terminal => None,
}
}
pub(crate) fn ctrl_system_conf(&self) -> KompactConfig {
let mut cfg = KompactConfig::default();
cfg.set_config_value(
&kompact::config_keys::system::LABEL,
"ctrl_system".to_string(),
);
let component_cfg = format!(
"{{ checkpoint_dir = {:?}, node_metrics_interval = {} }}",
self.checkpoints_dir(),
self.node_metrics_interval
);
if let Some(kompact_logger) = self.kompact_logger() {
cfg.logger(kompact_logger);
}
cfg.load_config_str(component_cfg);
if let Some(host) = &self.ctrl_system_host {
let sock_addr = host.parse().unwrap();
cfg.system_components(DeadletterBox::new, NetworkConfig::new(sock_addr).build());
}
cfg
}
pub fn data_system_conf(&self) -> KompactConfig {
let mut cfg = KompactConfig::default();
cfg.set_config_value(
&kompact::config_keys::system::LABEL,
"data_system".to_string(),
);
let component_cfg = format!(
"{{ checkpoint_dir = {:?}, node_metrics_interval = {} }}",
self.checkpoints_dir(),
self.node_metrics_interval
);
if let Some(kompact_logger) = self.kompact_logger() {
cfg.logger(kompact_logger);
}
cfg.load_config_str(component_cfg);
cfg.set_config_value(&kompact::config_keys::system::THREADS, self.kompact_threads);
cfg.set_config_value(
&kompact::config_keys::system::THROUGHPUT,
self.kompact_throughput,
);
cfg.set_config_value(
&kompact::config_keys::system::MESSAGE_PRIORITY,
self.kompact_msg_priority,
);
if let Some(host) = &self.kompact_network_host {
let mut buffer_config = BufferConfig::default();
buffer_config.chunk_size(self.kompact_chunk_size);
buffer_config.max_chunk_count(self.kompact_max_chunk_count);
buffer_config.initial_chunk_count(self.kompact_initial_chunk_count);
buffer_config.encode_buf_min_free_space(self.kompact_encode_buf_min_free_space);
let sock_addr = host.parse().unwrap();
cfg.system_components(
DeadletterBox::new,
NetworkConfig::with_buffer_config(sock_addr, buffer_config).build(),
);
}
cfg
}
#[cfg(all(feature = "serde", feature = "hocon"))]
pub fn from_file(path: impl AsRef<std::path::Path>) -> ApplicationConf {
use hocon::HoconLoader;
let data = std::fs::read_to_string(path).unwrap();
let loader: HoconLoader = HoconLoader::new().load_str(&data).unwrap();
loader.resolve().unwrap()
}
}
fn execution_mode_default() -> ExecutionMode {
ExecutionMode::Local
}
fn base_dir_default() -> PathBuf {
#[cfg(test)]
let mut res = tempfile::tempdir().unwrap().into_path();
#[cfg(not(test))]
let mut res = std::env::temp_dir();
res.push("arcon");
res
}
fn epoch_interval_default() -> u64 {
25000
}
fn watermark_interval_default() -> u64 {
250
}
fn max_key_default() -> u64 {
1024
}
fn node_metrics_interval_default() -> u64 {
250
}
fn buffer_pool_size_default() -> usize {
1024
}
fn buffer_pool_limit_default() -> usize {
buffer_pool_size_default() * 2
}
fn channel_batch_size_default() -> usize {
248
}
fn allocator_capacity_default() -> usize {
5368709120
}
fn kompact_threads_default() -> usize {
std::cmp::max(1, num_cpus::get())
}
fn ctrl_system_host_default() -> Option<String> {
None
}
fn kompact_throughput_default() -> usize {
25
}
fn kompact_msg_priority_default() -> f32 {
0.5
}
fn kompact_network_host_default() -> Option<String> {
None
}
fn kompact_chunk_size_default() -> usize {
128000
}
fn kompact_max_chunk_count_default() -> usize {
128
}
fn kompact_initial_chunk_count_default() -> usize {
2
}
fn kompact_encode_buf_min_free_space_default() -> usize {
64
}
#[cfg(test)]
mod tests {
#[test]
#[cfg(all(feature = "serde", feature = "hocon"))]
fn conf_from_file_test() {
use super::*;
use std::io::prelude::*;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new().unwrap();
let file_path = file.path().to_string_lossy().into_owned();
let config_str = r#"{base_dir: /dev/null, watermark_interval: 1000}"#;
file.write_all(config_str.as_bytes()).unwrap();
let conf: ApplicationConf = ApplicationConf::from_file(&file_path);
assert_eq!(conf.base_dir, PathBuf::from("/dev/null"));
assert_eq!(conf.watermark_interval, 1000);
assert_eq!(conf.node_metrics_interval, node_metrics_interval_default());
assert_eq!(conf.channel_batch_size, channel_batch_size_default());
assert_eq!(conf.buffer_pool_size, buffer_pool_size_default());
assert_eq!(conf.allocator_capacity, allocator_capacity_default());
assert_eq!(conf.kompact_threads, kompact_threads_default());
assert_eq!(conf.kompact_throughput, kompact_throughput_default());
assert_eq!(conf.kompact_network_host, kompact_network_host_default());
}
}