use arrow::array::Array;
use arrow::{
array::{ArrayRef, BooleanBufferBuilder, RecordBatch},
compute::concat_batches,
util::bit_util,
};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::not_impl_err;
use datafusion_common::{JoinSide, Result, internal_err};
use datafusion_execution::{
SendableRecordBatchStream,
memory_pool::{MemoryConsumer, MemoryReservation},
};
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{
Distribution, LexOrdering, OrderingRequirements, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr,
};
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use futures::TryStreamExt;
use parking_lot::Mutex;
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::joins::piecewise_merge_join::classic_join::{
ClassicPWMJStream, PiecewiseMergeJoinStreamState,
};
use crate::joins::piecewise_merge_join::utils::{
build_visited_indices_map, is_existence_join, is_right_existence_join,
};
use crate::joins::utils::asymmetric_join_output_partitioning;
use crate::metrics::MetricsSet;
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties,
};
use crate::{
ExecutionPlan, PlanProperties,
joins::{
SharedBitmapBuilder,
utils::{BuildProbeJoinMetrics, OnceAsync, OnceFut, build_join_schema},
},
metrics::ExecutionPlanMetricsSet,
spill::get_record_batch_memory_size,
};
#[derive(Debug)]
pub struct PiecewiseMergeJoinExec {
pub buffered: Arc<dyn ExecutionPlan>,
pub streamed: Arc<dyn ExecutionPlan>,
pub on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
pub operator: Operator,
pub join_type: JoinType,
schema: SchemaRef,
buffered_fut: OnceAsync<BufferedSideData>,
metrics: ExecutionPlanMetricsSet,
left_child_plan_required_order: LexOrdering,
right_batch_required_orders: LexOrdering,
sort_options: SortOptions,
cache: Arc<PlanProperties>,
num_partitions: usize,
}
impl PiecewiseMergeJoinExec {
pub fn try_new(
buffered: Arc<dyn ExecutionPlan>,
streamed: Arc<dyn ExecutionPlan>,
on: (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>),
operator: Operator,
join_type: JoinType,
num_partitions: usize,
) -> Result<Self> {
if is_existence_join(join_type) {
return not_impl_err!(
"Existence Joins are currently not supported for PiecewiseMergeJoin"
);
}
let sort_options = match operator {
Operator::Lt | Operator::LtEq => {
if is_right_existence_join(join_type) {
SortOptions::new(false, true)
} else {
SortOptions::new(true, true)
}
}
Operator::Gt | Operator::GtEq => {
if is_right_existence_join(join_type) {
SortOptions::new(true, true)
} else {
SortOptions::new(false, true)
}
}
_ => {
return internal_err!(
"Cannot contain non-range operator in PiecewiseMergeJoinExec"
);
}
};
let left_child_plan_required_order =
vec![PhysicalSortExpr::new(Arc::clone(&on.0), sort_options)];
let right_batch_required_orders =
vec![PhysicalSortExpr::new(Arc::clone(&on.1), sort_options)];
let Some(left_child_plan_required_order) =
LexOrdering::new(left_child_plan_required_order)
else {
return internal_err!(
"PiecewiseMergeJoinExec requires valid sort expressions for its left side"
);
};
let Some(right_batch_required_orders) =
LexOrdering::new(right_batch_required_orders)
else {
return internal_err!(
"PiecewiseMergeJoinExec requires valid sort expressions for its right side"
);
};
let buffered_schema = buffered.schema();
let streamed_schema = streamed.schema();
let schema =
Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0);
let cache = Self::compute_properties(
&buffered,
&streamed,
Arc::clone(&schema),
join_type,
&on,
)?;
Ok(Self {
streamed,
buffered,
on,
operator,
join_type,
schema,
buffered_fut: Default::default(),
metrics: ExecutionPlanMetricsSet::new(),
left_child_plan_required_order,
right_batch_required_orders,
sort_options,
cache: Arc::new(cache),
num_partitions,
})
}
pub fn buffered(&self) -> &Arc<dyn ExecutionPlan> {
&self.buffered
}
pub fn streamed(&self) -> &Arc<dyn ExecutionPlan> {
&self.streamed
}
pub fn join_type(&self) -> JoinType {
self.join_type
}
pub fn sort_options(&self) -> &SortOptions {
&self.sort_options
}
pub fn probe_side(join_type: &JoinType) -> JoinSide {
match join_type {
JoinType::Right
| JoinType::Inner
| JoinType::Full
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark => JoinSide::Right,
JoinType::Left
| JoinType::LeftAnti
| JoinType::LeftSemi
| JoinType::LeftMark => JoinSide::Left,
}
}
pub fn compute_properties(
buffered: &Arc<dyn ExecutionPlan>,
streamed: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
join_type: JoinType,
join_on: &(PhysicalExprRef, PhysicalExprRef),
) -> Result<PlanProperties> {
let eq_properties = join_equivalence_properties(
buffered.equivalence_properties().clone(),
streamed.equivalence_properties().clone(),
&join_type,
schema,
&Self::maintains_input_order(join_type),
Some(Self::probe_side(&join_type)),
std::slice::from_ref(join_on),
)?;
let output_partitioning =
asymmetric_join_output_partitioning(buffered, streamed, &join_type)?;
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
EmissionType::Incremental,
boundedness_from_children([buffered, streamed]),
))
}
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
match join_type {
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
vec![false, false]
}
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
vec![false, false]
}
_ => vec![false, false],
}
}
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
todo!()
}
fn with_new_children_and_same_properties(
&self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
let buffered = children.swap_remove(0);
let streamed = children.swap_remove(0);
Self {
buffered,
streamed,
on: self.on.clone(),
operator: self.operator,
join_type: self.join_type,
schema: Arc::clone(&self.schema),
left_child_plan_required_order: self.left_child_plan_required_order.clone(),
right_batch_required_orders: self.right_batch_required_orders.clone(),
sort_options: self.sort_options,
cache: Arc::clone(&self.cache),
num_partitions: self.num_partitions,
metrics: ExecutionPlanMetricsSet::new(),
buffered_fut: Default::default(),
}
}
}
impl ExecutionPlan for PiecewiseMergeJoinExec {
fn name(&self) -> &str {
"PiecewiseMergeJoinExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.buffered, &self.streamed]
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![
Distribution::SinglePartition,
Distribution::UnspecifiedDistribution,
]
}
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
if is_right_existence_join(self.join_type) {
unimplemented!()
} else {
vec![
Some(OrderingRequirements::from(
self.left_child_plan_required_order.clone(),
)),
None,
]
}
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
check_if_same_properties!(self, children);
match &children[..] {
[left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
self.on.clone(),
self.operator,
self.join_type,
self.num_partitions,
)?)),
_ => internal_err!(
"PiecewiseMergeJoin should have 2 children, found {}",
children.len()
),
}
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(self.with_new_children_and_same_properties(vec![
Arc::clone(&self.buffered),
Arc::clone(&self.streamed),
])))
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion_execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
let on_buffered = Arc::clone(&self.on.0);
let on_streamed = Arc::clone(&self.on.1);
let metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let buffered_fut = self.buffered_fut.try_once(|| {
let reservation = MemoryConsumer::new("PiecewiseMergeJoinInput")
.register(context.memory_pool());
let buffered_stream = self.buffered.execute(0, Arc::clone(&context))?;
Ok(build_buffered_data(
buffered_stream,
Arc::clone(&on_buffered),
metrics.clone(),
reservation,
build_visited_indices_map(self.join_type),
self.num_partitions,
))
})?;
let streamed = self.streamed.execute(partition, Arc::clone(&context))?;
let batch_size = context.session_config().batch_size();
if is_existence_join(self.join_type()) {
unreachable!()
} else {
Ok(Box::pin(ClassicPWMJStream::try_new(
Arc::clone(&self.schema),
on_streamed,
self.join_type,
self.operator,
streamed,
BufferedSide::Initial(BufferedSideInitialState { buffered_fut }),
PiecewiseMergeJoinStreamState::WaitBufferedSide,
self.sort_options,
metrics,
batch_size,
)))
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
impl DisplayAs for PiecewiseMergeJoinExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
let on_str = format!(
"({} {} {})",
fmt_sql(self.on.0.as_ref()),
self.operator,
fmt_sql(self.on.1.as_ref())
);
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"PiecewiseMergeJoin: operator={:?}, join_type={:?}, on={}",
self.operator, self.join_type, on_str
)
}
DisplayFormatType::TreeRender => {
writeln!(f, "operator={:?}", self.operator)?;
if self.join_type != JoinType::Inner {
writeln!(f, "join_type={:?}", self.join_type)?;
}
writeln!(f, "on={on_str}")
}
}
}
}
async fn build_buffered_data(
buffered: SendableRecordBatchStream,
on_buffered: PhysicalExprRef,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
build_map: bool,
remaining_partitions: usize,
) -> Result<BufferedSideData> {
let schema = buffered.schema();
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, reservation) = buffered
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
acc.3.try_grow(batch_size)?;
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
acc.2.build_input_rows.add(batch.num_rows());
acc.1 += batch.num_rows();
acc.0.push(batch);
Ok(acc)
})
.await?;
let single_batch = concat_batches(&schema, batches.iter())?;
let buffered_values = on_buffered
.evaluate(&single_batch)?
.into_array(single_batch.num_rows())?;
let size_estimation = get_record_batch_memory_size(&single_batch)
+ buffered_values.get_array_memory_size();
reservation.try_grow(size_estimation)?;
metrics.build_mem_used.add(size_estimation);
let visited_indices_bitmap = if build_map {
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
reservation.try_grow(bitmap_size)?;
metrics.build_mem_used.add(bitmap_size);
let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
bitmap_buffer.append_n(num_rows, false);
bitmap_buffer
} else {
BooleanBufferBuilder::new(0)
};
let buffered_data = BufferedSideData::new(
single_batch,
buffered_values,
Mutex::new(visited_indices_bitmap),
remaining_partitions,
reservation,
);
Ok(buffered_data)
}
pub(super) struct BufferedSideData {
pub(super) batch: RecordBatch,
values: ArrayRef,
pub(super) visited_indices_bitmap: SharedBitmapBuilder,
pub(super) remaining_partitions: AtomicUsize,
_reservation: MemoryReservation,
}
impl BufferedSideData {
pub(super) fn new(
batch: RecordBatch,
values: ArrayRef,
visited_indices_bitmap: SharedBitmapBuilder,
remaining_partitions: usize,
reservation: MemoryReservation,
) -> Self {
Self {
batch,
values,
visited_indices_bitmap,
remaining_partitions: AtomicUsize::new(remaining_partitions),
_reservation: reservation,
}
}
pub(super) fn batch(&self) -> &RecordBatch {
&self.batch
}
pub(super) fn values(&self) -> &ArrayRef {
&self.values
}
}
pub(super) enum BufferedSide {
Initial(BufferedSideInitialState),
Ready(BufferedSideReadyState),
}
impl BufferedSide {
pub(super) fn try_as_initial_mut(&mut self) -> Result<&mut BufferedSideInitialState> {
match self {
BufferedSide::Initial(state) => Ok(state),
_ => internal_err!("Expected build side in initial state"),
}
}
pub(super) fn try_as_ready(&self) -> Result<&BufferedSideReadyState> {
match self {
BufferedSide::Ready(state) => Ok(state),
_ => {
internal_err!("Expected build side in ready state")
}
}
}
pub(super) fn try_as_ready_mut(&mut self) -> Result<&mut BufferedSideReadyState> {
match self {
BufferedSide::Ready(state) => Ok(state),
_ => internal_err!("Expected build side in ready state"),
}
}
}
pub(super) struct BufferedSideInitialState {
pub(crate) buffered_fut: OnceFut<BufferedSideData>,
}
pub(super) struct BufferedSideReadyState {
pub(super) buffered_data: Arc<BufferedSideData>,
}