use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{displayable, ExecutionPlan};
use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
#[derive(Debug, Clone)]
pub(crate) struct ExecTree {
pub plan: Arc<dyn ExecutionPlan>,
pub idx: usize,
pub children: Vec<ExecTree>,
}
impl fmt::Display for ExecTree {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let plan_string = get_plan_string(&self.plan);
write!(f, "\nidx: {:?}", self.idx)?;
write!(f, "\nplan: {:?}", plan_string)?;
for child in self.children.iter() {
write!(f, "\nexec_tree:{}", child)?;
}
writeln!(f)
}
}
impl ExecTree {
pub fn new(
plan: Arc<dyn ExecutionPlan>,
idx: usize,
children: Vec<ExecTree>,
) -> Self {
ExecTree {
plan,
idx,
children,
}
}
}
pub(crate) fn get_children_exectrees(
n_children: usize,
onward: &Option<ExecTree>,
) -> Vec<Option<ExecTree>> {
let mut children_onward = vec![None; n_children];
if let Some(exec_tree) = &onward {
for child in &exec_tree.children {
children_onward[child.idx] = Some(child.clone());
}
}
children_onward
}
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_requirement: LexRequirementRef,
fetch: Option<usize>,
) {
if !node
.equivalence_properties()
.ordering_satisfy_requirement(sort_requirement)
{
let sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec());
let new_sort = SortExec::new(sort_expr, node.clone()).with_fetch(fetch);
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
new_sort.with_preserve_partitioning(true)
} else {
new_sort
}) as _
}
}
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
}
pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>()
}
pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortExec>()
}
pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortPreservingMergeExec>()
}
pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<CoalescePartitionsExec>()
}
pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<UnionExec>()
}
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}