#[cfg(test)]
use super::parquet::register_parquet_tables;
use crate::NetworkBoundaryExt;
use crate::common::serialize_uuid;
use crate::coordinator::DistributedExec;
use crate::stage::Stage;
use crate::worker::generated::worker::TaskKey;
#[cfg(test)]
use crate::{
DistributedConfig, DistributedExt, SessionStateBuilderExt, TaskEstimation, TaskEstimator,
display_plan_ascii, test_utils::in_memory_channel_resolver::InMemoryWorkerResolver,
};
#[cfg(test)]
use datafusion::{
common::Result,
config::ConfigOptions,
execution::{SessionState, context::SessionContext, session_state::SessionStateBuilder},
physical_plan::displayable,
prelude::SessionConfig,
};
use datafusion::{
common::{HashMap, HashSet},
physical_plan::ExecutionPlan,
};
use std::sync::Arc;
pub fn count_plan_nodes_up_to_network_boundary(plan: &Arc<dyn ExecutionPlan>) -> usize {
let mut count = 0;
let mut queue = vec![plan];
while let Some(plan) = queue.pop() {
count += 1;
if plan.as_ref().is_network_boundary() {
continue;
}
for child in plan.children() {
queue.push(child);
}
}
count
}
pub fn get_stages_and_task_keys(
stage: &DistributedExec,
) -> (HashMap<usize, &Stage>, HashSet<TaskKey>) {
let mut i = 0;
let mut queue = find_input_stages(stage);
let mut task_keys = HashSet::new();
let mut stages_map = HashMap::new();
while i < queue.len() {
let stage = queue[i];
stages_map.insert(stage.num(), stage);
i += 1;
for j in 0..stage.task_count() {
task_keys.insert(TaskKey {
query_id: serialize_uuid(&stage.query_id()),
stage_id: stage.num() as u64,
task_number: j as u64,
});
}
queue.extend(find_input_stages(stage.local_plan().unwrap().as_ref()));
}
(stages_map, task_keys)
}
fn find_input_stages(plan: &dyn ExecutionPlan) -> Vec<&Stage> {
let mut result = vec![];
for child in plan.children() {
if let Some(plan) = child.as_network_boundary() {
result.push(plan.input_stage());
} else {
result.extend(find_input_stages(child.as_ref()));
}
}
result
}
#[cfg(test)]
#[derive(Clone)]
pub(crate) struct TestPlan {
ctx: SessionContext,
}
#[cfg(test)]
impl TestPlan {
pub async fn physical_plan(&self, query: &str) -> Arc<dyn ExecutionPlan> {
let df = self.ctx.sql(query).await.unwrap();
df.create_physical_plan().await.unwrap()
}
pub async fn physical_plan_as_string(&self, query: &str) -> String {
let plan = self.physical_plan(query).await;
displayable(plan.as_ref()).indent(true).to_string()
}
pub fn get_ctx(&self) -> &SessionContext {
&self.ctx
}
}
#[cfg(test)]
pub(crate) struct TestPlanBuilder {
target_partitions: Option<usize>,
num_workers: Option<usize>,
distributed_planner: bool,
distributed_cardinality_effect_task_scale_factor: Option<f64>,
distributed_file_scan_config_bytes_per_partition: Option<usize>,
information_schema: Option<bool>,
broadcast_joins: bool,
distributed_task_estimator: Option<Arc<dyn TaskEstimator + Send + Sync + 'static>>,
distributed_partial_reduce: Option<bool>,
distributed_children_isolator_unions: Option<bool>,
distributed_max_tasks_per_stage: Option<usize>,
}
#[cfg(test)]
impl TestPlanBuilder {
pub fn new() -> Self {
Self {
target_partitions: None,
num_workers: None,
distributed_planner: false,
distributed_cardinality_effect_task_scale_factor: None,
distributed_file_scan_config_bytes_per_partition: Some(1),
information_schema: None,
broadcast_joins: false,
distributed_task_estimator: None,
distributed_partial_reduce: None,
distributed_children_isolator_unions: None,
distributed_max_tasks_per_stage: None,
}
}
pub fn target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = Some(target_partitions);
self
}
pub fn num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = Some(num_workers);
self
}
pub fn distributed_planner(mut self, enabled: bool) -> Self {
self.distributed_planner = enabled;
self
}
pub fn distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Self {
self.distributed_cardinality_effect_task_scale_factor = Some(factor);
self
}
pub fn distributed_file_scan_config_bytes_per_partition(
mut self,
bytes_per_partition: usize,
) -> Self {
self.distributed_file_scan_config_bytes_per_partition = Some(bytes_per_partition);
self
}
pub fn information_schema(mut self, enabled: bool) -> Self {
self.information_schema = Some(enabled);
self
}
pub fn broadcast_joins(mut self, enabled: bool) -> Self {
self.broadcast_joins = enabled;
self
}
pub fn distributed_task_estimator(
mut self,
task_estimator: impl TaskEstimator + Send + Sync + 'static,
) -> Self {
self.distributed_task_estimator = Some(Arc::new(task_estimator));
self
}
pub fn distributed_partial_reduce(mut self, enabled: bool) -> Self {
self.distributed_partial_reduce = Some(enabled);
self
}
pub fn distributed_children_isolator_unions(mut self, enabled: bool) -> Self {
self.distributed_children_isolator_unions = Some(enabled);
self
}
pub fn distributed_max_tasks_per_stage(mut self, n: usize) -> Self {
self.distributed_max_tasks_per_stage = Some(n);
self
}
fn build_config(&self) -> SessionConfig {
let mut d_cfg = DistributedConfig {
broadcast_joins: self.broadcast_joins,
..Default::default()
};
if let Some(x) = self.distributed_children_isolator_unions {
d_cfg.children_isolator_unions = x;
}
if let Some(x) = self.distributed_partial_reduce {
d_cfg.partial_reduce = x;
}
if let Some(n) = self.distributed_file_scan_config_bytes_per_partition {
d_cfg.file_scan_config_bytes_per_partition = n;
}
if let Some(f) = self.distributed_cardinality_effect_task_scale_factor {
d_cfg.cardinality_task_count_factor = f; }
if let Some(n) = self.distributed_max_tasks_per_stage {
d_cfg.max_tasks_per_stage = n
}
let mut config = SessionConfig::new();
config.set_distributed_option_extension(d_cfg);
if let Some(n) = self.target_partitions {
config = config.with_target_partitions(n);
}
if let Some(enabled) = self.information_schema {
config = config.with_information_schema(enabled);
}
config
}
fn build_state(&self, config: SessionConfig) -> SessionState {
let mut state = SessionStateBuilder::new()
.with_default_features()
.with_config(config);
if let Some(n) = self.num_workers {
state = state.with_distributed_worker_resolver(InMemoryWorkerResolver::new(n));
}
if self.distributed_planner {
state = state.with_distributed_planner();
}
if let Some(t) = self.distributed_task_estimator.clone() {
state = state.with_distributed_task_estimator(t);
}
state.build()
}
pub async fn build(&self) -> TestPlan {
let config = self.build_config();
let state = self.build_state(config);
let ctx = SessionContext::new_with_state(state);
register_parquet_tables(&ctx).await.unwrap();
TestPlan { ctx }
}
pub async fn physical_plan(&self, query: &str) -> Arc<dyn ExecutionPlan> {
self.build().await.physical_plan(query).await
}
pub async fn physical_plan_as_string(&self, query: &str) -> String {
let plan = self.physical_plan(query).await;
displayable(plan.as_ref()).indent(true).to_string()
}
pub async fn physical_plan_as_ascii(&self, query: &str, show_metrics: bool) -> String {
display_plan_ascii(self.physical_plan(query).await.as_ref(), show_metrics)
}
}
#[cfg(test)]
impl Default for TestPlanBuilder {
fn default() -> Self {
Self {
target_partitions: Some(4),
num_workers: Some(3),
distributed_planner: true,
distributed_cardinality_effect_task_scale_factor: None,
distributed_file_scan_config_bytes_per_partition: Some(1),
information_schema: Some(false),
broadcast_joins: false,
distributed_task_estimator: None,
distributed_partial_reduce: None,
distributed_children_isolator_unions: None,
distributed_max_tasks_per_stage: None,
}
}
}
#[cfg(test)]
#[derive(Debug)]
pub(crate) struct BuildSideOneTaskEstimator;
#[cfg(test)]
impl TaskEstimator for BuildSideOneTaskEstimator {
fn task_estimation(
&self,
plan: &Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Option<TaskEstimation> {
if !plan.children().is_empty() {
return None;
}
let schema = plan.schema();
let has_min_temp = schema.fields().iter().any(|f| f.name() == "MinTemp");
let has_max_temp = schema.fields().iter().any(|f| f.name() == "MaxTemp");
if has_min_temp && !has_max_temp {
Some(TaskEstimation::maximum(1))
} else {
None
}
}
fn scale_up_leaf_node(
&self,
_: &Arc<dyn ExecutionPlan>,
_: usize,
_: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
}