use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutorType {
DataFusion,
Ballista,
}
impl Default for ExecutorType {
fn default() -> Self {
Self::DataFusion
}
}
impl std::fmt::Display for ExecutorType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DataFusion => write!(f, "datafusion"),
Self::Ballista => write!(f, "ballista"),
}
}
}
impl std::str::FromStr for ExecutorType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"datafusion" => Ok(Self::DataFusion),
"ballista" => Ok(Self::Ballista),
_ => Err(format!("Unknown executor type: {}", s)),
}
}
}
#[derive(Debug, Clone)]
pub struct DistributedConfig {
executor_type: ExecutorType,
concurrency: usize,
memory_limit: Option<usize>,
skip_validation: bool,
enable_optimization: bool,
optimizer_rules: HashMap<String, String>,
options: HashMap<String, String>,
}
impl Default for DistributedConfig {
fn default() -> Self {
let mut optimizer_rules = HashMap::new();
optimizer_rules.insert("filter_pushdown".to_string(), "true".to_string());
optimizer_rules.insert("join_reordering".to_string(), "true".to_string());
optimizer_rules.insert("predicate_pushdown".to_string(), "true".to_string());
optimizer_rules.insert("projection_pushdown".to_string(), "true".to_string());
optimizer_rules.insert("skip_failed_rules".to_string(), "true".to_string());
optimizer_rules.insert(
"enable_round_robin_repartition".to_string(),
"true".to_string(),
);
optimizer_rules.insert("prefer_hash_join".to_string(), "true".to_string());
Self {
executor_type: ExecutorType::default(),
concurrency: num_cpus::get(),
memory_limit: None,
skip_validation: false,
enable_optimization: true,
optimizer_rules,
options: HashMap::new(),
}
}
}
impl DistributedConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_executor(mut self, executor: impl Into<String>) -> Self {
if let Ok(exec_type) = executor.into().parse() {
self.executor_type = exec_type;
}
self
}
pub fn with_executor_type(mut self, executor_type: ExecutorType) -> Self {
self.executor_type = executor_type;
self
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.memory_limit = Some(limit);
self
}
pub fn with_memory_limit_str(mut self, limit: impl AsRef<str>) -> Self {
let limit = limit.as_ref();
if let Some(bytes) = parse_memory_size(limit) {
self.memory_limit = Some(bytes);
}
self
}
pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.insert(key.into(), value.into());
self
}
pub fn executor_type(&self) -> ExecutorType {
self.executor_type
}
pub fn concurrency(&self) -> usize {
self.concurrency
}
pub fn memory_limit(&self) -> Option<usize> {
self.memory_limit
}
pub fn with_skip_validation(mut self, skip: bool) -> Self {
self.skip_validation = skip;
self
}
pub fn option(&self, key: &str) -> Option<&String> {
self.options.get(key)
}
pub fn options(&self) -> &HashMap<String, String> {
&self.options
}
pub fn skip_validation(&self) -> bool {
self.skip_validation
}
pub fn with_optimization(mut self, enable: bool) -> Self {
self.enable_optimization = enable;
self
}
pub fn enable_optimization(&self) -> bool {
self.enable_optimization
}
pub fn with_optimizer_rule(mut self, rule: impl Into<String>, enable: bool) -> Self {
self.optimizer_rules.insert(rule.into(), enable.to_string());
self
}
pub fn optimizer_rule(&self, rule: &str) -> Option<bool> {
self.optimizer_rules.get(rule).and_then(|v| v.parse().ok())
}
pub fn optimizer_rules(&self) -> &HashMap<String, String> {
&self.optimizer_rules
}
}
#[derive(Debug, Clone)]
pub struct BallistaConfig {
base_config: DistributedConfig,
scheduler: Option<String>,
num_executors: Option<usize>,
}
impl Default for BallistaConfig {
fn default() -> Self {
Self {
base_config: DistributedConfig::default().with_executor_type(ExecutorType::Ballista),
scheduler: None,
num_executors: None,
}
}
}
impl BallistaConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_scheduler(mut self, scheduler: impl Into<String>) -> Self {
self.scheduler = Some(scheduler.into());
self
}
pub fn with_num_executors(mut self, num_executors: usize) -> Self {
self.num_executors = Some(num_executors);
self
}
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.base_config = self.base_config.with_concurrency(concurrency);
self
}
pub fn with_memory_limit(mut self, limit: usize) -> Self {
self.base_config = self.base_config.with_memory_limit(limit);
self
}
pub fn with_memory_limit_str(mut self, limit: impl AsRef<str>) -> Self {
self.base_config = self.base_config.with_memory_limit_str(limit);
self
}
pub fn with_option(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.base_config = self.base_config.with_option(key, value);
self
}
pub fn base_config(&self) -> &DistributedConfig {
&self.base_config
}
pub fn scheduler(&self) -> Option<&String> {
self.scheduler.as_ref()
}
pub fn num_executors(&self) -> Option<usize> {
self.num_executors
}
pub fn to_distributed_config(&self) -> DistributedConfig {
let mut config = self.base_config.clone();
if let Some(scheduler) = &self.scheduler {
config = config.with_option("scheduler", scheduler);
}
if let Some(num_executors) = self.num_executors {
config = config.with_option("num_executors", num_executors.to_string());
}
config
}
}
impl From<BallistaConfig> for DistributedConfig {
fn from(config: BallistaConfig) -> Self {
config.to_distributed_config()
}
}
fn parse_memory_size(s: &str) -> Option<usize> {
let s = s.trim().to_uppercase();
let re = regex::Regex::new(r"^(\d+)\s*([KMGT]B)?$").ok()?;
let caps = re.captures(&s)?;
let num: usize = caps.get(1)?.as_str().parse().ok()?;
let unit = caps.get(2).map_or("B", |m| m.as_str());
match unit {
"B" => Some(num),
"KB" => Some(num * 1024),
"MB" => Some(num * 1024 * 1024),
"GB" => Some(num * 1024 * 1024 * 1024),
"TB" => Some(num * 1024 * 1024 * 1024 * 1024),
_ => None,
}
}