use std::cmp::Ordering;
use std::fs::File;
use std::io::BufReader;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::joins::utils::{JoinFilter, JoinKeyComparator, compare_join_arrays};
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
};
use crate::spill::spill_manager::SpillManager;
use crate::{EmptyRecordBatchStream, RecordBatchStream};
use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch};
use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::StreamReader;
use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
use arrow::util::bit_util::apply_bitwise_binary_op;
use datafusion_common::{
JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err,
};
use datafusion_execution::SendableRecordBatchStream;
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
use futures::{Stream, StreamExt, ready};
fn evaluate_join_keys(
batch: &RecordBatch,
on: &[PhysicalExprRef],
) -> Result<Vec<ArrayRef>> {
on.iter()
.map(|expr| {
let num_rows = batch.num_rows();
let val = expr.evaluate(batch)?;
val.into_array(num_rows)
})
.collect()
}
fn find_key_group_end(cmp: &JoinKeyComparator, from: usize, len: usize) -> usize {
let next = from + 1;
if next >= len {
return len;
}
if cmp.compare(from, next) != Ordering::Equal {
return next;
}
let last = len - 1;
if cmp.compare(from, last) == Ordering::Equal {
return len;
}
let mut lo = next + 1;
let mut hi = last;
while lo < hi {
let mid = lo + (hi - lo) / 2;
if cmp.compare(from, mid) == Ordering::Equal {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
#[derive(Debug)]
enum PendingBoundary {
NoFilter { saved_keys: Vec<ArrayRef> },
Filtered { saved_keys: Vec<ArrayRef> },
}
pub(crate) struct BitwiseSortMergeJoinStream {
join_type: JoinType,
outer: SendableRecordBatchStream,
inner: SendableRecordBatchStream,
outer_batch: Option<RecordBatch>,
outer_offset: usize,
outer_key_arrays: Vec<ArrayRef>,
inner_batch: Option<RecordBatch>,
inner_offset: usize,
inner_key_arrays: Vec<ArrayRef>,
matched: BooleanBufferBuilder,
inner_key_buffer: Vec<RecordBatch>,
inner_key_spill: Option<RefCountedTempFile>,
buffering_inner_pending: bool,
pending_boundary: Option<PendingBoundary>,
on_outer: Vec<PhysicalExprRef>,
on_inner: Vec<PhysicalExprRef>,
filter: Option<JoinFilter>,
sort_options: Vec<SortOptions>,
null_equality: NullEquality,
outer_is_left: bool,
coalescer: BatchCoalescer,
schema: SchemaRef,
join_time: crate::metrics::Time,
input_batches: Count,
input_rows: Count,
baseline_metrics: BaselineMetrics,
peak_mem_used: Gauge,
reservation: MemoryReservation,
spill_manager: SpillManager,
runtime_env: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
inner_buffer_size: usize,
outer_inner_cmp: Option<JoinKeyComparator>,
outer_self_cmp: Option<JoinKeyComparator>,
inner_self_cmp: Option<JoinKeyComparator>,
batch_emitted: bool,
}
impl BitwiseSortMergeJoinStream {
#[expect(clippy::too_many_arguments)]
pub fn try_new(
schema: SchemaRef,
sort_options: Vec<SortOptions>,
null_equality: NullEquality,
outer: SendableRecordBatchStream,
inner: SendableRecordBatchStream,
on_outer: Vec<PhysicalExprRef>,
on_inner: Vec<PhysicalExprRef>,
filter: Option<JoinFilter>,
join_type: JoinType,
batch_size: usize,
partition: usize,
metrics: &ExecutionPlanMetricsSet,
reservation: MemoryReservation,
spill_manager: SpillManager,
runtime_env: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
) -> Result<Self> {
debug_assert!(
matches!(
join_type,
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
),
"BitwiseSortMergeJoinStream does not handle {join_type:?}"
);
let outer_is_left = matches!(
join_type,
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark
);
let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let baseline_metrics = BaselineMetrics::new(metrics, partition);
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
Ok(Self {
join_type,
outer,
inner,
outer_batch: None,
outer_offset: 0,
outer_key_arrays: vec![],
inner_batch: None,
inner_offset: 0,
inner_key_arrays: vec![],
matched: BooleanBufferBuilder::new(0),
inner_key_buffer: vec![],
inner_key_spill: None,
buffering_inner_pending: false,
pending_boundary: None,
on_outer,
on_inner,
filter,
sort_options,
null_equality,
outer_is_left,
coalescer: BatchCoalescer::new(Arc::clone(&schema), batch_size)
.with_biggest_coalesce_batch_size(Some(batch_size / 2)),
schema,
join_time,
input_batches,
input_rows,
baseline_metrics,
peak_mem_used,
reservation,
spill_manager,
runtime_env,
inner_buffer_size: 0,
outer_inner_cmp: None,
outer_self_cmp: None,
inner_self_cmp: None,
batch_emitted: false,
})
}
fn try_resize_reservation(&mut self) -> Result<()> {
let needed = self.inner_buffer_size;
self.reservation.try_resize(needed)?;
self.peak_mem_used.set_max(self.reservation.size());
Ok(())
}
fn get_outer_inner_cmp(&mut self) -> Result<&JoinKeyComparator> {
if self.outer_inner_cmp.is_none() {
self.outer_inner_cmp = Some(JoinKeyComparator::new(
&self.outer_key_arrays,
&self.inner_key_arrays,
&self.sort_options,
self.null_equality,
)?);
}
Ok(self.outer_inner_cmp.as_ref().unwrap())
}
fn get_outer_self_cmp(&mut self) -> Result<&JoinKeyComparator> {
if self.outer_self_cmp.is_none() {
self.outer_self_cmp = Some(JoinKeyComparator::new(
&self.outer_key_arrays,
&self.outer_key_arrays,
&self.sort_options,
self.null_equality,
)?);
}
Ok(self.outer_self_cmp.as_ref().unwrap())
}
fn get_inner_self_cmp(&mut self) -> Result<&JoinKeyComparator> {
if self.inner_self_cmp.is_none() {
self.inner_self_cmp = Some(JoinKeyComparator::new(
&self.inner_key_arrays,
&self.inner_key_arrays,
&self.sort_options,
self.null_equality,
)?);
}
Ok(self.inner_self_cmp.as_ref().unwrap())
}
fn spill_inner_key_buffer(&mut self) -> Result<()> {
let spill_file = self
.spill_manager
.spill_record_batch_and_finish(
&self.inner_key_buffer,
"semi_anti_smj_inner_key_spill",
)?
.expect("inner_key_buffer is non-empty when spilling");
self.inner_key_buffer.clear();
self.inner_buffer_size = 0;
self.inner_key_spill = Some(spill_file);
self.try_resize_reservation()
}
fn clear_inner_key_group(&mut self) {
self.inner_key_buffer.clear();
self.inner_key_spill = None;
self.inner_buffer_size = 0;
}
fn poll_next_outer_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
loop {
match ready!(self.outer.poll_next_unpin(cx)) {
None => {
let outer_schema = self.outer.schema();
self.outer = Box::pin(EmptyRecordBatchStream::new(outer_schema));
return Poll::Ready(Ok(false));
}
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(batch)) => {
let batch_num_rows = batch.num_rows();
self.input_batches.add(1);
self.input_rows.add(batch_num_rows);
if batch_num_rows == 0 {
continue;
}
let keys = evaluate_join_keys(&batch, &self.on_outer)?;
self.outer_batch = Some(batch);
self.outer_offset = 0;
self.outer_key_arrays = keys;
self.outer_inner_cmp = None;
self.outer_self_cmp = None;
self.batch_emitted = false;
self.matched = BooleanBufferBuilder::new(batch_num_rows);
self.matched.append_n(batch_num_rows, false);
return Poll::Ready(Ok(true));
}
}
}
}
fn poll_next_inner_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
loop {
match ready!(self.inner.poll_next_unpin(cx)) {
None => {
let inner_schema = self.inner.schema();
self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema));
return Poll::Ready(Ok(false));
}
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(batch)) => {
let batch_num_rows = batch.num_rows();
self.input_batches.add(1);
self.input_rows.add(batch_num_rows);
if batch_num_rows == 0 {
continue;
}
let keys = evaluate_join_keys(&batch, &self.on_inner)?;
self.inner_batch = Some(batch);
self.inner_offset = 0;
self.inner_key_arrays = keys;
self.outer_inner_cmp = None;
self.inner_self_cmp = None;
return Poll::Ready(Ok(true));
}
}
}
}
fn emit_outer_batch(&mut self) -> Result<()> {
if self.batch_emitted {
return Ok(());
}
self.batch_emitted = true;
let batch = self.outer_batch.as_ref().unwrap();
let matched_buf = self.matched.finish();
match self.join_type {
JoinType::LeftMark | JoinType::RightMark => {
debug_assert_eq!(
self.schema.fields().len(),
batch.num_columns() + 1,
"Mark join output schema should be outer schema + 1 mark column"
);
let mark_col = Arc::new(BooleanArray::new(matched_buf, None)) as ArrayRef;
let mut columns = Vec::with_capacity(batch.num_columns() + 1);
columns.extend_from_slice(batch.columns());
columns.push(mark_col);
let output = RecordBatch::try_new(Arc::clone(&self.schema), columns)?;
self.coalescer.push_batch(output)?;
}
JoinType::LeftSemi | JoinType::RightSemi => {
let selection = BooleanArray::new(matched_buf, None);
let filtered = filter_record_batch(batch, &selection)?;
if filtered.num_rows() > 0 {
self.coalescer.push_batch(filtered)?;
}
}
JoinType::LeftAnti | JoinType::RightAnti => {
let selection = not(&BooleanArray::new(matched_buf, None))?;
let filtered = filter_record_batch(batch, &selection)?;
if filtered.num_rows() > 0 {
self.coalescer.push_batch(filtered)?;
}
}
_ => unreachable!(),
}
Ok(())
}
fn process_key_match_no_filter(&mut self) -> Result<()> {
let outer_batch = self.outer_batch.as_ref().unwrap();
let num_outer = outer_batch.num_rows();
self.get_outer_self_cmp()?;
let outer_group_end = find_key_group_end(
self.outer_self_cmp.as_ref().unwrap(),
self.outer_offset,
num_outer,
);
for i in self.outer_offset..outer_group_end {
self.matched.set_bit(i, true);
}
self.outer_offset = outer_group_end;
Ok(())
}
fn advance_inner_past_key_group(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<bool>> {
loop {
let inner_batch = match &self.inner_batch {
Some(b) => b,
None => return Poll::Ready(Ok(true)),
};
let num_inner = inner_batch.num_rows();
self.get_inner_self_cmp()?;
let group_end = find_key_group_end(
self.inner_self_cmp.as_ref().unwrap(),
self.inner_offset,
num_inner,
);
if group_end < num_inner {
self.inner_offset = group_end;
return Poll::Ready(Ok(false));
}
let saved_inner_keys = slice_keys(&self.inner_key_arrays, num_inner - 1);
match ready!(self.poll_next_inner_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
return Poll::Ready(Ok(true));
}
Ok(true) => {
if keys_match(
&saved_inner_keys,
&self.inner_key_arrays,
&self.sort_options,
self.null_equality,
)? {
continue;
} else {
return Poll::Ready(Ok(false));
}
}
}
}
}
fn buffer_inner_key_group(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
let mut resume_from_poll = false;
if self.buffering_inner_pending {
self.buffering_inner_pending = false;
resume_from_poll = true;
} else {
self.clear_inner_key_group();
}
loop {
if self.inner_batch.is_none() {
return Poll::Ready(Ok(true));
}
let num_inner = self.inner_batch.as_ref().unwrap().num_rows();
self.get_inner_self_cmp()?;
let group_end = find_key_group_end(
self.inner_self_cmp.as_ref().unwrap(),
self.inner_offset,
num_inner,
);
if !resume_from_poll {
let inner_batch = self.inner_batch.as_ref().unwrap();
let slice =
inner_batch.slice(self.inner_offset, group_end - self.inner_offset);
self.inner_buffer_size += slice.get_array_memory_size();
self.inner_key_buffer.push(slice);
if self.try_resize_reservation().is_err() {
if self.runtime_env.disk_manager.tmp_files_enabled() {
self.spill_inner_key_buffer()?;
} else {
self.try_resize_reservation().map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"{e}. Disk spilling disabled."
))
})?;
}
}
if group_end < num_inner {
self.inner_offset = group_end;
return Poll::Ready(Ok(false));
}
}
resume_from_poll = false;
let saved_inner_keys = slice_keys(&self.inner_key_arrays, num_inner - 1);
self.buffering_inner_pending = true;
match ready!(self.poll_next_inner_batch(cx)) {
Err(e) => {
self.buffering_inner_pending = false;
return Poll::Ready(Err(e));
}
Ok(false) => {
self.buffering_inner_pending = false;
return Poll::Ready(Ok(true));
}
Ok(true) => {
self.buffering_inner_pending = false;
if keys_match(
&saved_inner_keys,
&self.inner_key_arrays,
&self.sort_options,
self.null_equality,
)? {
continue;
} else {
return Poll::Ready(Ok(false));
}
}
}
}
}
fn process_key_match_with_filter(&mut self) -> Result<()> {
self.get_outer_self_cmp()?;
let filter = self.filter.as_ref().unwrap();
let outer_batch = self.outer_batch.as_ref().unwrap();
let num_outer = outer_batch.num_rows();
debug_assert!(
!self.inner_key_buffer.is_empty() || self.inner_key_spill.is_some(),
"process_key_match_with_filter called with no inner key data"
);
debug_assert!(
self.outer_offset < num_outer,
"outer_offset must be within the current batch"
);
debug_assert!(
self.matched.len() == num_outer,
"matched vector must be sized for the current outer batch"
);
let outer_group_end = find_key_group_end(
self.outer_self_cmp.as_ref().unwrap(),
self.outer_offset,
num_outer,
);
let outer_group_len = outer_group_end - self.outer_offset;
let outer_slice = outer_batch.slice(self.outer_offset, outer_group_len);
let mut matched_count = UnalignedBitChunk::new(
self.matched.as_slice(),
self.outer_offset,
outer_group_len,
)
.count_ones();
if let Some(spill_file) = &self.inner_key_spill {
let file = BufReader::new(File::open(spill_file.path())?);
let reader = StreamReader::try_new(file, None)?;
for batch_result in reader {
let inner_slice = batch_result?;
matched_count = eval_filter_for_inner_slice(
self.outer_is_left,
filter,
&outer_slice,
&inner_slice,
&mut self.matched,
self.outer_offset,
outer_group_len,
matched_count,
)?;
if matched_count == outer_group_len {
break;
}
}
}
if matched_count < outer_group_len {
'outer: for inner_slice in &self.inner_key_buffer {
matched_count = eval_filter_for_inner_slice(
self.outer_is_left,
filter,
&outer_slice,
inner_slice,
&mut self.matched,
self.outer_offset,
outer_group_len,
matched_count,
)?;
if matched_count == outer_group_len {
break 'outer;
}
}
}
self.outer_offset = outer_group_end;
Ok(())
}
fn resume_boundary(&mut self) -> Result<bool> {
debug_assert!(
self.outer_batch.is_some(),
"caller must load outer_batch first"
);
match self.pending_boundary.take() {
Some(PendingBoundary::NoFilter { saved_keys }) => {
let same_key = keys_match(
&saved_keys,
&self.outer_key_arrays,
&self.sort_options,
self.null_equality,
)?;
if same_key {
self.process_key_match_no_filter()?;
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
if self.outer_offset >= num_outer {
self.pending_boundary = Some(PendingBoundary::NoFilter {
saved_keys: slice_keys(&self.outer_key_arrays, num_outer - 1),
});
self.emit_outer_batch()?;
self.outer_batch = None;
return Ok(true);
}
}
}
Some(PendingBoundary::Filtered { saved_keys }) => {
debug_assert!(
!self.inner_key_buffer.is_empty() || self.inner_key_spill.is_some(),
"Filtered pending boundary entered but no inner key data exists"
);
let same_key = keys_match(
&saved_keys,
&self.outer_key_arrays,
&self.sort_options,
self.null_equality,
)?;
if same_key {
self.process_key_match_with_filter()?;
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
if self.outer_offset >= num_outer {
self.pending_boundary = Some(PendingBoundary::Filtered {
saved_keys: slice_keys(&self.outer_key_arrays, num_outer - 1),
});
self.emit_outer_batch()?;
self.outer_batch = None;
return Ok(true);
}
}
self.clear_inner_key_group();
}
None => {}
}
Ok(false)
}
fn poll_join(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<RecordBatch>>> {
let join_time = self.join_time.clone();
let _timer = join_time.timer();
loop {
if self.outer_batch.is_none() {
match ready!(self.poll_next_outer_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.pending_boundary = None;
self.coalescer.finish_buffered_batch()?;
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Ok(Some(batch)));
}
return Poll::Ready(Ok(None));
}
Ok(true) => {
if self.resume_boundary()? {
continue;
}
}
}
}
if self.inner_batch.is_none() && self.pending_boundary.is_none() {
match ready!(self.poll_next_inner_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.emit_outer_batch()?;
self.outer_batch = None;
loop {
match ready!(self.poll_next_outer_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => break,
Ok(true) => {
self.emit_outer_batch()?;
self.outer_batch = None;
}
}
}
self.coalescer.finish_buffered_batch()?;
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Ok(Some(batch)));
}
return Poll::Ready(Ok(None));
}
Ok(true) => {}
}
}
let outer_batch = self.outer_batch.as_ref().unwrap();
let num_outer = outer_batch.num_rows();
if self.outer_offset >= num_outer {
self.emit_outer_batch()?;
self.outer_batch = None;
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Ok(Some(batch)));
}
continue;
}
let inner_batch = match &self.inner_batch {
Some(b) => b,
None => {
self.emit_outer_batch()?;
self.outer_batch = None;
continue;
}
};
let num_inner = inner_batch.num_rows();
if self.inner_offset >= num_inner {
match ready!(self.poll_next_inner_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.inner_batch = None;
continue;
}
Ok(true) => continue,
}
}
self.get_outer_inner_cmp()?;
let cmp = self
.outer_inner_cmp
.as_ref()
.unwrap()
.compare(self.outer_offset, self.inner_offset);
match cmp {
Ordering::Less => {
self.get_outer_self_cmp()?;
let group_end = find_key_group_end(
self.outer_self_cmp.as_ref().unwrap(),
self.outer_offset,
num_outer,
);
self.outer_offset = group_end;
}
Ordering::Greater => {
self.get_inner_self_cmp()?;
let group_end = find_key_group_end(
self.inner_self_cmp.as_ref().unwrap(),
self.inner_offset,
num_inner,
);
if group_end >= num_inner {
let saved_keys =
slice_keys(&self.inner_key_arrays, num_inner - 1);
match ready!(self.poll_next_inner_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.inner_batch = None;
continue;
}
Ok(true) => {
if keys_match(
&saved_keys,
&self.inner_key_arrays,
&self.sort_options,
self.null_equality,
)? {
match ready!(self.advance_inner_past_key_group(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(_) => continue,
}
}
continue;
}
}
} else {
self.inner_offset = group_end;
}
}
Ordering::Equal => {
if self.filter.is_some() {
match ready!(self.buffer_inner_key_group(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(_inner_exhausted) => {}
}
loop {
self.process_key_match_with_filter()?;
let outer_batch = self.outer_batch.as_ref().unwrap();
if self.outer_offset >= outer_batch.num_rows() {
let saved_keys = slice_keys(
&self.outer_key_arrays,
outer_batch.num_rows() - 1,
);
self.emit_outer_batch()?;
debug_assert!(
!self.inner_key_buffer.is_empty()
|| self.inner_key_spill.is_some(),
"Filtered pending boundary requires inner key data in buffer or spill"
);
self.pending_boundary =
Some(PendingBoundary::Filtered { saved_keys });
match ready!(self.poll_next_outer_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.pending_boundary = None;
self.outer_batch = None;
break;
}
Ok(true) => {
let Some(PendingBoundary::Filtered {
saved_keys,
}) = self.pending_boundary.take()
else {
unreachable!()
};
let same = keys_match(
&saved_keys,
&self.outer_key_arrays,
&self.sort_options,
self.null_equality,
)?;
if same {
continue;
}
break;
}
}
} else {
break;
}
}
self.clear_inner_key_group();
} else {
match ready!(self.advance_inner_past_key_group(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(_inner_exhausted) => {}
}
loop {
self.process_key_match_no_filter()?;
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
if self.outer_offset >= num_outer {
let saved_keys =
slice_keys(&self.outer_key_arrays, num_outer - 1);
self.emit_outer_batch()?;
self.pending_boundary =
Some(PendingBoundary::NoFilter { saved_keys });
match ready!(self.poll_next_outer_batch(cx)) {
Err(e) => return Poll::Ready(Err(e)),
Ok(false) => {
self.pending_boundary = None;
self.outer_batch = None;
break;
}
Ok(true) => {
let Some(PendingBoundary::NoFilter {
saved_keys,
}) = self.pending_boundary.take()
else {
unreachable!()
};
let same_key = keys_match(
&saved_keys,
&self.outer_key_arrays,
&self.sort_options,
self.null_equality,
)?;
if same_key {
continue;
}
break;
}
}
} else {
break;
}
}
}
}
}
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Ok(Some(batch)));
}
}
}
}
#[expect(clippy::too_many_arguments)]
fn eval_filter_for_inner_slice(
outer_is_left: bool,
filter: &JoinFilter,
outer_slice: &RecordBatch,
inner_slice: &RecordBatch,
matched: &mut BooleanBufferBuilder,
outer_offset: usize,
outer_group_len: usize,
mut matched_count: usize,
) -> Result<usize> {
debug_assert_eq!(
matched_count,
UnalignedBitChunk::new(matched.as_slice(), outer_offset, outer_group_len)
.count_ones()
);
for inner_row in 0..inner_slice.num_rows() {
if matched_count == outer_group_len {
break;
}
let filter_result = evaluate_filter_for_inner_row(
outer_is_left,
filter,
outer_slice,
inner_slice,
inner_row,
)?;
let filter_buf = filter_result.values();
apply_bitwise_binary_op(
matched.as_slice_mut(),
outer_offset,
filter_buf.inner().as_slice(),
filter_buf.offset(),
outer_group_len,
|a, b| a | b,
);
matched_count =
UnalignedBitChunk::new(matched.as_slice(), outer_offset, outer_group_len)
.count_ones();
}
Ok(matched_count)
}
fn slice_keys(keys: &[ArrayRef], idx: usize) -> Vec<ArrayRef> {
keys.iter().map(|a| a.slice(idx, 1)).collect()
}
fn keys_match(
left_arrays: &[ArrayRef],
right_arrays: &[ArrayRef],
sort_options: &[SortOptions],
null_equality: NullEquality,
) -> Result<bool> {
debug_assert!(left_arrays.iter().all(|a| a.len() == 1));
let cmp = compare_join_arrays(
left_arrays,
0,
right_arrays,
0,
sort_options,
null_equality,
)?;
Ok(cmp == Ordering::Equal)
}
fn evaluate_filter_for_inner_row(
outer_is_left: bool,
filter: &JoinFilter,
outer_slice: &RecordBatch,
inner_batch: &RecordBatch,
inner_idx: usize,
) -> Result<BooleanArray> {
let num_outer_rows = outer_slice.num_rows();
let mut columns: Vec<ArrayRef> = Vec::with_capacity(filter.column_indices().len());
for col_idx in filter.column_indices() {
let (side_batch, side_idx) = if outer_is_left {
match col_idx.side {
JoinSide::Left => (outer_slice, None),
JoinSide::Right => (inner_batch, Some(inner_idx)),
JoinSide::None => {
return internal_err!("Unexpected JoinSide::None in filter");
}
}
} else {
match col_idx.side {
JoinSide::Left => (inner_batch, Some(inner_idx)),
JoinSide::Right => (outer_slice, None),
JoinSide::None => {
return internal_err!("Unexpected JoinSide::None in filter");
}
}
};
match side_idx {
None => {
columns.push(Arc::clone(side_batch.column(col_idx.index)));
}
Some(idx) => {
let scalar = ScalarValue::try_from_array(
side_batch.column(col_idx.index).as_ref(),
idx,
)?;
columns.push(scalar.to_array_of_size(num_outer_rows)?);
}
}
}
let filter_batch = RecordBatch::try_new(Arc::clone(filter.schema()), columns)?;
let result = filter
.expression()
.evaluate(&filter_batch)?
.into_array(num_outer_rows)?;
let bool_arr = result
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"Filter expression did not return BooleanArray".to_string(),
)
})?;
if bool_arr.null_count() > 0 {
Ok(arrow::compute::prep_null_mask_filter(bool_arr))
} else {
Ok(bool_arr.clone())
}
}
impl Stream for BitwiseSortMergeJoinStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.poll_join(cx).map(|result| result.transpose());
self.baseline_metrics.record_poll(poll)
}
}
impl RecordBatchStream for BitwiseSortMergeJoinStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}