use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
#[derive(Default)]
pub struct GlobalSortSelection {}
impl GlobalSortSelection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
&& !sort_exec.preserve_partitioning()
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
let global_sort: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
));
Some(global_sort)
} else {
None
}
}))
})
}
fn name(&self) -> &str {
"global_sort_selection"
}
fn schema_check(&self) -> bool {
false
}
}