1use std::any::Any;
21use std::fmt::Formatter;
22use std::ops::{BitOr, ControlFlow};
23use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::task::Poll;
26
27use super::utils::{
28 asymmetric_join_output_partitioning, need_produce_result_in_final,
29 reorder_output_after_swap, swap_join_projection,
30};
31use crate::common::can_project;
32use crate::execution_plan::{EmissionType, boundedness_from_children};
33use crate::joins::SharedBitmapBuilder;
34use crate::joins::utils::{
35 BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut,
36 build_join_schema, check_join_is_valid, estimate_join_statistics,
37 need_produce_right_in_final,
38};
39use crate::metrics::{
40 Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricsSet, RatioMetrics,
41};
42use crate::projection::{
43 EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
44 try_pushdown_through_join,
45};
46use crate::{
47 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
48 PlanProperties, RecordBatchStream, SendableRecordBatchStream,
49};
50
51use arrow::array::{
52 Array, BooleanArray, BooleanBufferBuilder, RecordBatchOptions, UInt32Array,
53 UInt64Array, new_null_array,
54};
55use arrow::buffer::BooleanBuffer;
56use arrow::compute::{
57 BatchCoalescer, concat_batches, filter, filter_record_batch, not, take,
58};
59use arrow::datatypes::{Schema, SchemaRef};
60use arrow::record_batch::RecordBatch;
61use arrow_schema::DataType;
62use datafusion_common::cast::as_boolean_array;
63use datafusion_common::{
64 JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err,
65 internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err,
66};
67use datafusion_execution::TaskContext;
68use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
69use datafusion_expr::JoinType;
70use datafusion_physical_expr::equivalence::{
71 ProjectionMapping, join_equivalence_properties,
72};
73
74use futures::{Stream, StreamExt, TryStreamExt};
75use log::debug;
76use parking_lot::Mutex;
77
78#[expect(rustdoc::private_intra_doc_links)]
79#[derive(Debug)]
173pub struct NestedLoopJoinExec {
174 pub(crate) left: Arc<dyn ExecutionPlan>,
176 pub(crate) right: Arc<dyn ExecutionPlan>,
178 pub(crate) filter: Option<JoinFilter>,
180 pub(crate) join_type: JoinType,
182 join_schema: SchemaRef,
185 build_side_data: OnceAsync<JoinLeftData>,
192 column_indices: Vec<ColumnIndex>,
194 projection: Option<Vec<usize>>,
196
197 metrics: ExecutionPlanMetricsSet,
199 cache: PlanProperties,
201}
202
203impl NestedLoopJoinExec {
204 pub fn try_new(
206 left: Arc<dyn ExecutionPlan>,
207 right: Arc<dyn ExecutionPlan>,
208 filter: Option<JoinFilter>,
209 join_type: &JoinType,
210 projection: Option<Vec<usize>>,
211 ) -> Result<Self> {
212 let left_schema = left.schema();
213 let right_schema = right.schema();
214 check_join_is_valid(&left_schema, &right_schema, &[])?;
215 let (join_schema, column_indices) =
216 build_join_schema(&left_schema, &right_schema, join_type);
217 let join_schema = Arc::new(join_schema);
218 let cache = Self::compute_properties(
219 &left,
220 &right,
221 &join_schema,
222 *join_type,
223 projection.as_ref(),
224 )?;
225
226 Ok(NestedLoopJoinExec {
227 left,
228 right,
229 filter,
230 join_type: *join_type,
231 join_schema,
232 build_side_data: Default::default(),
233 column_indices,
234 projection,
235 metrics: Default::default(),
236 cache,
237 })
238 }
239
240 pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
242 &self.left
243 }
244
245 pub fn right(&self) -> &Arc<dyn ExecutionPlan> {
247 &self.right
248 }
249
250 pub fn filter(&self) -> Option<&JoinFilter> {
252 self.filter.as_ref()
253 }
254
255 pub fn join_type(&self) -> &JoinType {
257 &self.join_type
258 }
259
260 pub fn projection(&self) -> Option<&Vec<usize>> {
261 self.projection.as_ref()
262 }
263
264 fn compute_properties(
266 left: &Arc<dyn ExecutionPlan>,
267 right: &Arc<dyn ExecutionPlan>,
268 schema: &SchemaRef,
269 join_type: JoinType,
270 projection: Option<&Vec<usize>>,
271 ) -> Result<PlanProperties> {
272 let mut eq_properties = join_equivalence_properties(
274 left.equivalence_properties().clone(),
275 right.equivalence_properties().clone(),
276 &join_type,
277 Arc::clone(schema),
278 &Self::maintains_input_order(join_type),
279 None,
280 &[],
282 )?;
283
284 let mut output_partitioning =
285 asymmetric_join_output_partitioning(left, right, &join_type)?;
286
287 let emission_type = if left.boundedness().is_unbounded() {
288 EmissionType::Final
289 } else if right.pipeline_behavior() == EmissionType::Incremental {
290 match join_type {
291 JoinType::Inner
294 | JoinType::LeftSemi
295 | JoinType::RightSemi
296 | JoinType::Right
297 | JoinType::RightAnti
298 | JoinType::RightMark => EmissionType::Incremental,
299 JoinType::Left
302 | JoinType::LeftAnti
303 | JoinType::LeftMark
304 | JoinType::Full => EmissionType::Both,
305 }
306 } else {
307 right.pipeline_behavior()
308 };
309
310 if let Some(projection) = projection {
311 let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
313 let out_schema = project_schema(schema, Some(projection))?;
314 output_partitioning =
315 output_partitioning.project(&projection_mapping, &eq_properties);
316 eq_properties = eq_properties.project(&projection_mapping, out_schema);
317 }
318
319 Ok(PlanProperties::new(
320 eq_properties,
321 output_partitioning,
322 emission_type,
323 boundedness_from_children([left, right]),
324 ))
325 }
326
327 fn maintains_input_order(_join_type: JoinType) -> Vec<bool> {
329 vec![false, false]
330 }
331
332 pub fn contains_projection(&self) -> bool {
333 self.projection.is_some()
334 }
335
336 pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
337 can_project(&self.schema(), projection.as_ref())?;
339 let projection = match projection {
340 Some(projection) => match &self.projection {
341 Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
342 None => Some(projection),
343 },
344 None => None,
345 };
346 Self::try_new(
347 Arc::clone(&self.left),
348 Arc::clone(&self.right),
349 self.filter.clone(),
350 &self.join_type,
351 projection,
352 )
353 }
354
355 pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
364 let left = self.left();
365 let right = self.right();
366 let new_join = NestedLoopJoinExec::try_new(
367 Arc::clone(right),
368 Arc::clone(left),
369 self.filter().map(JoinFilter::swap),
370 &self.join_type().swap(),
371 swap_join_projection(
372 left.schema().fields().len(),
373 right.schema().fields().len(),
374 self.projection.as_ref(),
375 self.join_type(),
376 ),
377 )?;
378
379 let plan: Arc<dyn ExecutionPlan> = if matches!(
382 self.join_type(),
383 JoinType::LeftSemi
384 | JoinType::RightSemi
385 | JoinType::LeftAnti
386 | JoinType::RightAnti
387 | JoinType::LeftMark
388 | JoinType::RightMark
389 ) || self.projection.is_some()
390 {
391 Arc::new(new_join)
392 } else {
393 reorder_output_after_swap(
394 Arc::new(new_join),
395 &self.left().schema(),
396 &self.right().schema(),
397 )?
398 };
399
400 Ok(plan)
401 }
402}
403
404impl DisplayAs for NestedLoopJoinExec {
405 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
406 match t {
407 DisplayFormatType::Default | DisplayFormatType::Verbose => {
408 let display_filter = self.filter.as_ref().map_or_else(
409 || "".to_string(),
410 |f| format!(", filter={}", f.expression()),
411 );
412 let display_projections = if self.contains_projection() {
413 format!(
414 ", projection=[{}]",
415 self.projection
416 .as_ref()
417 .unwrap()
418 .iter()
419 .map(|index| format!(
420 "{}@{}",
421 self.join_schema.fields().get(*index).unwrap().name(),
422 index
423 ))
424 .collect::<Vec<_>>()
425 .join(", ")
426 )
427 } else {
428 "".to_string()
429 };
430 write!(
431 f,
432 "NestedLoopJoinExec: join_type={:?}{}{}",
433 self.join_type, display_filter, display_projections
434 )
435 }
436 DisplayFormatType::TreeRender => {
437 if *self.join_type() != JoinType::Inner {
438 writeln!(f, "join_type={:?}", self.join_type)
439 } else {
440 Ok(())
441 }
442 }
443 }
444 }
445}
446
447impl ExecutionPlan for NestedLoopJoinExec {
448 fn name(&self) -> &'static str {
449 "NestedLoopJoinExec"
450 }
451
452 fn as_any(&self) -> &dyn Any {
453 self
454 }
455
456 fn properties(&self) -> &PlanProperties {
457 &self.cache
458 }
459
460 fn required_input_distribution(&self) -> Vec<Distribution> {
461 vec![
462 Distribution::SinglePartition,
463 Distribution::UnspecifiedDistribution,
464 ]
465 }
466
467 fn maintains_input_order(&self) -> Vec<bool> {
468 Self::maintains_input_order(self.join_type)
469 }
470
471 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
472 vec![&self.left, &self.right]
473 }
474
475 fn with_new_children(
476 self: Arc<Self>,
477 children: Vec<Arc<dyn ExecutionPlan>>,
478 ) -> Result<Arc<dyn ExecutionPlan>> {
479 Ok(Arc::new(NestedLoopJoinExec::try_new(
480 Arc::clone(&children[0]),
481 Arc::clone(&children[1]),
482 self.filter.clone(),
483 &self.join_type,
484 self.projection.clone(),
485 )?))
486 }
487
488 fn execute(
489 &self,
490 partition: usize,
491 context: Arc<TaskContext>,
492 ) -> Result<SendableRecordBatchStream> {
493 assert_eq_or_internal_err!(
494 self.left.output_partitioning().partition_count(),
495 1,
496 "Invalid NestedLoopJoinExec, the output partition count of the left child must be 1,\
497 consider using CoalescePartitionsExec or the EnforceDistribution rule"
498 );
499
500 let metrics = NestedLoopJoinMetrics::new(&self.metrics, partition);
501
502 let load_reservation =
504 MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
505 .register(context.memory_pool());
506
507 let build_side_data = self.build_side_data.try_once(|| {
508 let stream = self.left.execute(0, Arc::clone(&context))?;
509
510 Ok(collect_left_input(
511 stream,
512 metrics.join_metrics.clone(),
513 load_reservation,
514 need_produce_result_in_final(self.join_type),
515 self.right().output_partitioning().partition_count(),
516 ))
517 })?;
518
519 let batch_size = context.session_config().batch_size();
520
521 let probe_side_data = self.right.execute(partition, context)?;
522
523 let column_indices_after_projection = match &self.projection {
525 Some(projection) => projection
526 .iter()
527 .map(|i| self.column_indices[*i].clone())
528 .collect(),
529 None => self.column_indices.clone(),
530 };
531
532 Ok(Box::pin(NestedLoopJoinStream::new(
533 self.schema(),
534 self.filter.clone(),
535 self.join_type,
536 probe_side_data,
537 build_side_data,
538 column_indices_after_projection,
539 metrics,
540 batch_size,
541 )))
542 }
543
544 fn metrics(&self) -> Option<MetricsSet> {
545 Some(self.metrics.clone_inner())
546 }
547
548 fn statistics(&self) -> Result<Statistics> {
549 self.partition_statistics(None)
550 }
551
552 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
553 let join_columns = Vec::new();
561
562 let left_stats = self.left.partition_statistics(None)?;
567 let right_stats = match partition {
568 Some(partition) => self.right.partition_statistics(Some(partition))?,
569 None => self.right.partition_statistics(None)?,
570 };
571
572 let stats = estimate_join_statistics(
573 left_stats,
574 right_stats,
575 &join_columns,
576 &self.join_type,
577 &self.join_schema,
578 )?;
579
580 Ok(stats.project(self.projection.as_ref()))
581 }
582
583 fn try_swapping_with_projection(
587 &self,
588 projection: &ProjectionExec,
589 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
590 if self.contains_projection() {
592 return Ok(None);
593 }
594
595 let schema = self.schema();
596 if let Some(JoinData {
597 projected_left_child,
598 projected_right_child,
599 join_filter,
600 ..
601 }) = try_pushdown_through_join(
602 projection,
603 self.left(),
604 self.right(),
605 &[],
606 &schema,
607 self.filter(),
608 )? {
609 Ok(Some(Arc::new(NestedLoopJoinExec::try_new(
610 Arc::new(projected_left_child),
611 Arc::new(projected_right_child),
612 join_filter,
613 self.join_type(),
614 None,
616 )?)))
617 } else {
618 try_embed_projection(projection, self)
619 }
620 }
621}
622
623impl EmbeddedProjection for NestedLoopJoinExec {
624 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
625 self.with_projection(projection)
626 }
627}
628
629pub(crate) struct JoinLeftData {
631 batch: RecordBatch,
633 bitmap: SharedBitmapBuilder,
635 probe_threads_counter: AtomicUsize,
637 #[expect(dead_code)]
641 reservation: MemoryReservation,
642}
643
644impl JoinLeftData {
645 pub(crate) fn new(
646 batch: RecordBatch,
647 bitmap: SharedBitmapBuilder,
648 probe_threads_counter: AtomicUsize,
649 reservation: MemoryReservation,
650 ) -> Self {
651 Self {
652 batch,
653 bitmap,
654 probe_threads_counter,
655 reservation,
656 }
657 }
658
659 pub(crate) fn batch(&self) -> &RecordBatch {
660 &self.batch
661 }
662
663 pub(crate) fn bitmap(&self) -> &SharedBitmapBuilder {
664 &self.bitmap
665 }
666
667 pub(crate) fn report_probe_completed(&self) -> bool {
670 self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
671 }
672}
673
674async fn collect_left_input(
676 stream: SendableRecordBatchStream,
677 join_metrics: BuildProbeJoinMetrics,
678 reservation: MemoryReservation,
679 with_visited_left_side: bool,
680 probe_threads_count: usize,
681) -> Result<JoinLeftData> {
682 let schema = stream.schema();
683
684 let (batches, metrics, mut reservation) = stream
686 .try_fold(
687 (Vec::new(), join_metrics, reservation),
688 |(mut batches, metrics, mut reservation), batch| async {
689 let batch_size = batch.get_array_memory_size();
690 reservation.try_grow(batch_size)?;
692 metrics.build_mem_used.add(batch_size);
694 metrics.build_input_batches.add(1);
695 metrics.build_input_rows.add(batch.num_rows());
696 batches.push(batch);
698 Ok((batches, metrics, reservation))
699 },
700 )
701 .await?;
702
703 let merged_batch = concat_batches(&schema, &batches)?;
704
705 let visited_left_side = if with_visited_left_side {
707 let n_rows = merged_batch.num_rows();
708 let buffer_size = n_rows.div_ceil(8);
709 reservation.try_grow(buffer_size)?;
710 metrics.build_mem_used.add(buffer_size);
711
712 let mut buffer = BooleanBufferBuilder::new(n_rows);
713 buffer.append_n(n_rows, false);
714 buffer
715 } else {
716 BooleanBufferBuilder::new(0)
717 };
718
719 Ok(JoinLeftData::new(
720 merged_batch,
721 Mutex::new(visited_left_side),
722 AtomicUsize::new(probe_threads_count),
723 reservation,
724 ))
725}
726
727#[derive(Debug, Clone, Copy)]
730enum NLJState {
731 BufferingLeft,
732 FetchingRight,
733 ProbeRight,
734 EmitRightUnmatched,
735 EmitLeftUnmatched,
736 Done,
737}
738pub(crate) struct NestedLoopJoinStream {
739 pub(crate) output_schema: Arc<Schema>,
750 pub(crate) join_filter: Option<JoinFilter>,
752 pub(crate) join_type: JoinType,
754 pub(crate) right_data: SendableRecordBatchStream,
756 pub(crate) left_data: OnceFut<JoinLeftData>,
758 pub(crate) column_indices: Vec<ColumnIndex>,
771 pub(crate) metrics: NestedLoopJoinMetrics,
773
774 batch_size: usize,
776
777 should_track_unmatched_right: bool,
779
780 state: NLJState,
786 output_buffer: Box<BatchCoalescer>,
789 handled_empty_output: bool,
791
792 buffered_left_data: Option<Arc<JoinLeftData>>,
796 left_probe_idx: usize,
798 left_emit_idx: usize,
800 left_exhausted: bool,
803 #[expect(dead_code)]
806 left_buffered_in_one_pass: bool,
807
808 current_right_batch: Option<RecordBatch>,
812 current_right_batch_matched: Option<BooleanArray>,
815}
816
817pub(crate) struct NestedLoopJoinMetrics {
818 pub(crate) join_metrics: BuildProbeJoinMetrics,
820 pub(crate) selectivity: RatioMetrics,
822}
823
824impl NestedLoopJoinMetrics {
825 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
826 Self {
827 join_metrics: BuildProbeJoinMetrics::new(partition, metrics),
828 selectivity: MetricBuilder::new(metrics)
829 .with_type(MetricType::SUMMARY)
830 .ratio_metrics("selectivity", partition),
831 }
832 }
833}
834
835impl Stream for NestedLoopJoinStream {
836 type Item = Result<RecordBatch>;
837
838 fn poll_next(
869 mut self: std::pin::Pin<&mut Self>,
870 cx: &mut std::task::Context<'_>,
871 ) -> Poll<Option<Self::Item>> {
872 loop {
873 match self.state {
874 NLJState::BufferingLeft => {
880 debug!("[NLJState] Entering: {:?}", self.state);
881 let build_metric = self.metrics.join_metrics.build_time.clone();
886 let _build_timer = build_metric.timer();
887
888 match self.handle_buffering_left(cx) {
889 ControlFlow::Continue(()) => continue,
890 ControlFlow::Break(poll) => return poll,
891 }
892 }
893
894 NLJState::FetchingRight => {
917 debug!("[NLJState] Entering: {:?}", self.state);
918 let join_metric = self.metrics.join_metrics.join_time.clone();
920 let _join_timer = join_metric.timer();
921
922 match self.handle_fetching_right(cx) {
923 ControlFlow::Continue(()) => continue,
924 ControlFlow::Break(poll) => return poll,
925 }
926 }
927
928 NLJState::ProbeRight => {
943 debug!("[NLJState] Entering: {:?}", self.state);
944
945 let join_metric = self.metrics.join_metrics.join_time.clone();
947 let _join_timer = join_metric.timer();
948
949 match self.handle_probe_right() {
950 ControlFlow::Continue(()) => continue,
951 ControlFlow::Break(poll) => {
952 return self.metrics.join_metrics.baseline.record_poll(poll);
953 }
954 }
955 }
956
957 NLJState::EmitRightUnmatched => {
964 debug!("[NLJState] Entering: {:?}", self.state);
965
966 let join_metric = self.metrics.join_metrics.join_time.clone();
968 let _join_timer = join_metric.timer();
969
970 match self.handle_emit_right_unmatched() {
971 ControlFlow::Continue(()) => continue,
972 ControlFlow::Break(poll) => {
973 return self.metrics.join_metrics.baseline.record_poll(poll);
974 }
975 }
976 }
977
978 NLJState::EmitLeftUnmatched => {
994 debug!("[NLJState] Entering: {:?}", self.state);
995
996 let join_metric = self.metrics.join_metrics.join_time.clone();
998 let _join_timer = join_metric.timer();
999
1000 match self.handle_emit_left_unmatched() {
1001 ControlFlow::Continue(()) => continue,
1002 ControlFlow::Break(poll) => {
1003 return self.metrics.join_metrics.baseline.record_poll(poll);
1004 }
1005 }
1006 }
1007
1008 NLJState::Done => {
1010 debug!("[NLJState] Entering: {:?}", self.state);
1011
1012 let join_metric = self.metrics.join_metrics.join_time.clone();
1014 let _join_timer = join_metric.timer();
1015 let poll = self.handle_done();
1019 return self.metrics.join_metrics.baseline.record_poll(poll);
1020 }
1021 }
1022 }
1023 }
1024}
1025
1026impl RecordBatchStream for NestedLoopJoinStream {
1027 fn schema(&self) -> SchemaRef {
1028 Arc::clone(&self.output_schema)
1029 }
1030}
1031
1032impl NestedLoopJoinStream {
1033 #[expect(clippy::too_many_arguments)]
1034 pub(crate) fn new(
1035 schema: Arc<Schema>,
1036 filter: Option<JoinFilter>,
1037 join_type: JoinType,
1038 right_data: SendableRecordBatchStream,
1039 left_data: OnceFut<JoinLeftData>,
1040 column_indices: Vec<ColumnIndex>,
1041 metrics: NestedLoopJoinMetrics,
1042 batch_size: usize,
1043 ) -> Self {
1044 Self {
1045 output_schema: Arc::clone(&schema),
1046 join_filter: filter,
1047 join_type,
1048 right_data,
1049 column_indices,
1050 left_data,
1051 metrics,
1052 buffered_left_data: None,
1053 output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)),
1054 batch_size,
1055 current_right_batch: None,
1056 current_right_batch_matched: None,
1057 state: NLJState::BufferingLeft,
1058 left_probe_idx: 0,
1059 left_emit_idx: 0,
1060 left_exhausted: false,
1061 left_buffered_in_one_pass: true,
1062 handled_empty_output: false,
1063 should_track_unmatched_right: need_produce_right_in_final(join_type),
1064 }
1065 }
1066
1067 fn handle_buffering_left(
1071 &mut self,
1072 cx: &mut std::task::Context<'_>,
1073 ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1074 match self.left_data.get_shared(cx) {
1075 Poll::Ready(Ok(left_data)) => {
1076 self.buffered_left_data = Some(left_data);
1077 self.left_exhausted = true;
1079 self.state = NLJState::FetchingRight;
1080 ControlFlow::Continue(())
1082 }
1083 Poll::Ready(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1084 Poll::Pending => ControlFlow::Break(Poll::Pending),
1085 }
1086 }
1087
1088 fn handle_fetching_right(
1090 &mut self,
1091 cx: &mut std::task::Context<'_>,
1092 ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1093 match self.right_data.poll_next_unpin(cx) {
1094 Poll::Ready(result) => match result {
1095 Some(Ok(right_batch)) => {
1096 let right_batch_size = right_batch.num_rows();
1098 self.metrics.join_metrics.input_rows.add(right_batch_size);
1099 self.metrics.join_metrics.input_batches.add(1);
1100
1101 if right_batch_size == 0 {
1103 return ControlFlow::Continue(());
1104 }
1105
1106 self.current_right_batch = Some(right_batch);
1107
1108 if self.should_track_unmatched_right {
1110 let zeroed_buf = BooleanBuffer::new_unset(right_batch_size);
1111 self.current_right_batch_matched =
1112 Some(BooleanArray::new(zeroed_buf, None));
1113 }
1114
1115 self.left_probe_idx = 0;
1116 self.state = NLJState::ProbeRight;
1117 ControlFlow::Continue(())
1118 }
1119 Some(Err(e)) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1120 None => {
1121 self.state = NLJState::EmitLeftUnmatched;
1123 ControlFlow::Continue(())
1124 }
1125 },
1126 Poll::Pending => ControlFlow::Break(Poll::Pending),
1127 }
1128 }
1129
1130 fn handle_probe_right(&mut self) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1132 if let Some(poll) = self.maybe_flush_ready_batch() {
1134 return ControlFlow::Break(poll);
1135 }
1136
1137 match self.process_probe_batch() {
1139 Ok(true) => ControlFlow::Continue(()),
1143 Ok(false) => {
1147 self.left_probe_idx = 0;
1149
1150 if let (Ok(left_data), Some(right_batch)) =
1153 (self.get_left_data(), self.current_right_batch.as_ref())
1154 {
1155 let left_rows = left_data.batch().num_rows();
1156 let right_rows = right_batch.num_rows();
1157 self.metrics.selectivity.add_total(left_rows * right_rows);
1158 }
1159
1160 if self.should_track_unmatched_right {
1161 debug_assert!(
1162 self.current_right_batch_matched.is_some(),
1163 "If it's required to track matched rows in the right input, the right bitmap must be present"
1164 );
1165 self.state = NLJState::EmitRightUnmatched;
1166 } else {
1167 self.current_right_batch = None;
1168 self.state = NLJState::FetchingRight;
1169 }
1170 ControlFlow::Continue(())
1171 }
1172 Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1173 }
1174 }
1175
1176 fn handle_emit_right_unmatched(
1178 &mut self,
1179 ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1180 if let Some(poll) = self.maybe_flush_ready_batch() {
1182 return ControlFlow::Break(poll);
1183 }
1184
1185 debug_assert!(
1186 self.current_right_batch_matched.is_some()
1187 && self.current_right_batch.is_some(),
1188 "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present"
1189 );
1190 match self.process_right_unmatched() {
1192 Ok(Some(batch)) => {
1193 match self.output_buffer.push_batch(batch) {
1194 Ok(()) => {
1195 debug_assert!(self.current_right_batch.is_none());
1198 self.state = NLJState::FetchingRight;
1199 ControlFlow::Continue(())
1200 }
1201 Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))),
1202 }
1203 }
1204 Ok(None) => {
1205 debug_assert!(self.current_right_batch.is_none());
1208 self.state = NLJState::FetchingRight;
1209 ControlFlow::Continue(())
1210 }
1211 Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1212 }
1213 }
1214
1215 fn handle_emit_left_unmatched(
1217 &mut self,
1218 ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
1219 if let Some(poll) = self.maybe_flush_ready_batch() {
1221 return ControlFlow::Break(poll);
1222 }
1223
1224 match self.process_left_unmatched() {
1226 Ok(true) => ControlFlow::Continue(()),
1229 Ok(false) => match self.output_buffer.finish_buffered_batch() {
1232 Ok(()) => {
1233 self.state = NLJState::Done;
1234 ControlFlow::Continue(())
1235 }
1236 Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))),
1237 },
1238 Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))),
1239 }
1240 }
1241
1242 fn handle_done(&mut self) -> Poll<Option<Result<RecordBatch>>> {
1244 if let Some(poll) = self.maybe_flush_ready_batch() {
1246 return poll;
1247 }
1248
1249 if !self.handled_empty_output {
1255 let zero_count = Count::new();
1256 if *self.metrics.join_metrics.baseline.output_rows() == zero_count {
1257 let empty_batch = RecordBatch::new_empty(Arc::clone(&self.output_schema));
1258 self.handled_empty_output = true;
1259 return Poll::Ready(Some(Ok(empty_batch)));
1260 }
1261 }
1262
1263 Poll::Ready(None)
1264 }
1265
1266 fn process_probe_batch(&mut self) -> Result<bool> {
1273 let left_data = Arc::clone(self.get_left_data()?);
1274 let right_batch = self
1275 .current_right_batch
1276 .as_ref()
1277 .ok_or_else(|| internal_datafusion_err!("Right batch should be available"))?
1278 .clone();
1279
1280 if self.left_probe_idx >= left_data.batch().num_rows() {
1282 return Ok(false);
1283 }
1284
1285 debug_assert_ne!(
1298 right_batch.num_rows(),
1299 0,
1300 "When fetching the right batch, empty batches will be skipped"
1301 );
1302
1303 let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
1304 if l_row_cnt_ratio > 10 {
1305 let l_row_count = std::cmp::min(
1309 l_row_cnt_ratio,
1310 left_data.batch().num_rows() - self.left_probe_idx,
1311 );
1312
1313 debug_assert!(
1314 l_row_count != 0,
1315 "This function should only be entered when there are remaining left rows to process"
1316 );
1317 let joined_batch = self.process_left_range_join(
1318 &left_data,
1319 &right_batch,
1320 self.left_probe_idx,
1321 l_row_count,
1322 )?;
1323
1324 if let Some(batch) = joined_batch {
1325 self.output_buffer.push_batch(batch)?;
1326 }
1327
1328 self.left_probe_idx += l_row_count;
1329
1330 return Ok(true);
1331 }
1332
1333 let l_idx = self.left_probe_idx;
1334 let joined_batch =
1335 self.process_single_left_row_join(&left_data, &right_batch, l_idx)?;
1336
1337 if let Some(batch) = joined_batch {
1338 self.output_buffer.push_batch(batch)?;
1339 }
1340
1341 self.left_probe_idx += 1;
1345
1346 Ok(true)
1348 }
1349
1350 fn process_left_range_join(
1356 &mut self,
1357 left_data: &JoinLeftData,
1358 right_batch: &RecordBatch,
1359 l_start_index: usize,
1360 l_row_count: usize,
1361 ) -> Result<Option<RecordBatch>> {
1362 let right_rows = right_batch.num_rows();
1368 let total_rows = l_row_count * right_rows;
1369
1370 let left_indices: UInt32Array =
1372 UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| {
1373 std::iter::repeat_n((l_start_index + i) as u32, right_rows)
1374 }));
1375 let right_indices: UInt32Array = UInt32Array::from_iter_values(
1376 (0..l_row_count).flat_map(|_| 0..right_rows as u32),
1377 );
1378
1379 debug_assert!(
1380 left_indices.len() == right_indices.len()
1381 && right_indices.len() == total_rows,
1382 "The length or cartesian product should be (left_size * right_size)",
1383 );
1384
1385 let bitmap_combined = if let Some(filter) = &self.join_filter {
1388 let intermediate_batch = if filter.schema.fields().is_empty() {
1390 create_record_batch_with_empty_schema(
1392 Arc::new((*filter.schema).clone()),
1393 total_rows,
1394 )?
1395 } else {
1396 let mut filter_columns: Vec<Arc<dyn Array>> =
1397 Vec::with_capacity(filter.column_indices().len());
1398 for column_index in filter.column_indices() {
1399 let array = if column_index.side == JoinSide::Left {
1400 let col = left_data.batch().column(column_index.index);
1401 take(col.as_ref(), &left_indices, None)?
1402 } else {
1403 let col = right_batch.column(column_index.index);
1404 take(col.as_ref(), &right_indices, None)?
1405 };
1406 filter_columns.push(array);
1407 }
1408
1409 RecordBatch::try_new(Arc::new((*filter.schema).clone()), filter_columns)?
1410 };
1411
1412 let filter_result = filter
1413 .expression()
1414 .evaluate(&intermediate_batch)?
1415 .into_array(intermediate_batch.num_rows())?;
1416 let filter_arr = as_boolean_array(&filter_result)?;
1417
1418 boolean_mask_from_filter(filter_arr)
1420 } else {
1421 BooleanArray::from(vec![true; total_rows])
1423 };
1424
1425 let mut left_bitmap = if need_produce_result_in_final(self.join_type) {
1430 Some(left_data.bitmap().lock())
1431 } else {
1432 None
1433 };
1434
1435 let mut local_right_bitmap = if self.should_track_unmatched_right {
1439 let mut current_right_batch_bitmap = BooleanBufferBuilder::new(right_rows);
1440 current_right_batch_bitmap.append_n(right_rows, false);
1442 Some(current_right_batch_bitmap)
1443 } else {
1444 None
1445 };
1446
1447 for (i, is_matched) in bitmap_combined.iter().enumerate() {
1449 let is_matched = is_matched.ok_or_else(|| {
1450 internal_datafusion_err!("Must be Some after the previous combining step")
1451 })?;
1452
1453 let l_index = l_start_index + i / right_rows;
1454 let r_index = i % right_rows;
1455
1456 if let Some(bitmap) = left_bitmap.as_mut()
1457 && is_matched
1458 {
1459 bitmap.set_bit(l_index, true);
1461 }
1462
1463 if let Some(bitmap) = local_right_bitmap.as_mut()
1464 && is_matched
1465 {
1466 bitmap.set_bit(r_index, true);
1467 }
1468 }
1469
1470 if self.should_track_unmatched_right {
1472 let global_right_bitmap =
1474 std::mem::take(&mut self.current_right_batch_matched).ok_or_else(
1475 || internal_datafusion_err!("right batch's bitmap should be present"),
1476 )?;
1477 let (buf, nulls) = global_right_bitmap.into_parts();
1478 debug_assert!(nulls.is_none());
1479
1480 let current_right_bitmap = local_right_bitmap
1481 .ok_or_else(|| {
1482 internal_datafusion_err!(
1483 "Should be Some if the current join type requires right bitmap"
1484 )
1485 })?
1486 .finish();
1487 let updated_global_right_bitmap = buf.bitor(¤t_right_bitmap);
1488
1489 self.current_right_batch_matched =
1490 Some(BooleanArray::new(updated_global_right_bitmap, None));
1491 }
1492
1493 if matches!(
1495 self.join_type,
1496 JoinType::LeftAnti
1497 | JoinType::LeftSemi
1498 | JoinType::LeftMark
1499 | JoinType::RightAnti
1500 | JoinType::RightMark
1501 | JoinType::RightSemi
1502 ) {
1503 return Ok(None);
1504 }
1505
1506 if self.output_schema.fields().is_empty() {
1509 let row_count = bitmap_combined.true_count();
1511 return Ok(Some(create_record_batch_with_empty_schema(
1512 Arc::clone(&self.output_schema),
1513 row_count,
1514 )?));
1515 }
1516
1517 let mut out_columns: Vec<Arc<dyn Array>> =
1518 Vec::with_capacity(self.output_schema.fields().len());
1519 for column_index in &self.column_indices {
1520 let array = if column_index.side == JoinSide::Left {
1521 let col = left_data.batch().column(column_index.index);
1522 take(col.as_ref(), &left_indices, None)?
1523 } else {
1524 let col = right_batch.column(column_index.index);
1525 take(col.as_ref(), &right_indices, None)?
1526 };
1527 out_columns.push(array);
1528 }
1529 let pre_filtered =
1530 RecordBatch::try_new(Arc::clone(&self.output_schema), out_columns)?;
1531 let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?;
1532 Ok(Some(filtered))
1533 }
1534
1535 fn process_single_left_row_join(
1541 &mut self,
1542 left_data: &JoinLeftData,
1543 right_batch: &RecordBatch,
1544 l_index: usize,
1545 ) -> Result<Option<RecordBatch>> {
1546 let right_row_count = right_batch.num_rows();
1547 if right_row_count == 0 {
1548 return Ok(None);
1549 }
1550
1551 let cur_right_bitmap = if let Some(filter) = &self.join_filter {
1552 apply_filter_to_row_join_batch(
1553 left_data.batch(),
1554 l_index,
1555 right_batch,
1556 filter,
1557 )?
1558 } else {
1559 BooleanArray::from(vec![true; right_row_count])
1560 };
1561
1562 self.update_matched_bitmap(l_index, &cur_right_bitmap)?;
1563
1564 if matches!(
1567 self.join_type,
1568 JoinType::LeftAnti
1569 | JoinType::LeftSemi
1570 | JoinType::LeftMark
1571 | JoinType::RightAnti
1572 | JoinType::RightMark
1573 | JoinType::RightSemi
1574 ) {
1575 return Ok(None);
1576 }
1577
1578 if cur_right_bitmap.true_count() == 0 {
1579 Ok(None)
1581 } else {
1582 let join_batch = build_row_join_batch(
1584 &self.output_schema,
1585 left_data.batch(),
1586 l_index,
1587 right_batch,
1588 Some(cur_right_bitmap),
1589 &self.column_indices,
1590 JoinSide::Left,
1591 )?;
1592 Ok(join_batch)
1593 }
1594 }
1595
1596 fn process_left_unmatched(&mut self) -> Result<bool> {
1600 let left_data = self.get_left_data()?;
1601 let left_batch = left_data.batch();
1602
1603 let join_type_no_produce_left = !need_produce_result_in_final(self.join_type);
1609 let handled_by_other_partition =
1611 self.left_emit_idx == 0 && !left_data.report_probe_completed();
1612 let finished = self.left_emit_idx >= left_batch.num_rows();
1614
1615 if join_type_no_produce_left || handled_by_other_partition || finished {
1616 return Ok(false);
1617 }
1618
1619 let start_idx = self.left_emit_idx;
1624 let end_idx = std::cmp::min(start_idx + self.batch_size, left_batch.num_rows());
1625
1626 if let Some(batch) =
1627 self.process_left_unmatched_range(left_data, start_idx, end_idx)?
1628 {
1629 self.output_buffer.push_batch(batch)?;
1630 }
1631
1632 self.left_emit_idx = end_idx;
1634
1635 Ok(true)
1637 }
1638
1639 fn process_left_unmatched_range(
1652 &self,
1653 left_data: &JoinLeftData,
1654 start_idx: usize,
1655 end_idx: usize,
1656 ) -> Result<Option<RecordBatch>> {
1657 if start_idx == end_idx {
1658 return Ok(None);
1659 }
1660
1661 let left_batch = left_data.batch();
1664 let left_batch_sliced = left_batch.slice(start_idx, end_idx - start_idx);
1665
1666 let mut bitmap_sliced = BooleanBufferBuilder::new(end_idx - start_idx);
1668 bitmap_sliced.append_n(end_idx - start_idx, false);
1669 let bitmap = left_data.bitmap().lock();
1670 for i in start_idx..end_idx {
1671 assert!(
1672 i - start_idx < bitmap_sliced.capacity(),
1673 "DBG: {start_idx}, {end_idx}"
1674 );
1675 bitmap_sliced.set_bit(i - start_idx, bitmap.get_bit(i));
1676 }
1677 let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None);
1678
1679 let right_schema = self.right_data.schema();
1680 build_unmatched_batch(
1681 &self.output_schema,
1682 &left_batch_sliced,
1683 bitmap_sliced,
1684 &right_schema,
1685 &self.column_indices,
1686 self.join_type,
1687 JoinSide::Left,
1688 )
1689 }
1690
1691 fn process_right_unmatched(&mut self) -> Result<Option<RecordBatch>> {
1694 let right_batch_bitmap: BooleanArray =
1696 std::mem::take(&mut self.current_right_batch_matched).ok_or_else(|| {
1697 internal_datafusion_err!("right bitmap should be available")
1698 })?;
1699
1700 let right_batch = self.current_right_batch.take();
1701 let cur_right_batch = unwrap_or_internal_err!(right_batch);
1702
1703 let left_data = self.get_left_data()?;
1704 let left_schema = left_data.batch().schema();
1705
1706 let res = build_unmatched_batch(
1707 &self.output_schema,
1708 &cur_right_batch,
1709 right_batch_bitmap,
1710 &left_schema,
1711 &self.column_indices,
1712 self.join_type,
1713 JoinSide::Right,
1714 );
1715
1716 self.current_right_batch_matched = None;
1718
1719 res
1720 }
1721
1722 fn get_left_data(&self) -> Result<&Arc<JoinLeftData>> {
1726 self.buffered_left_data
1727 .as_ref()
1728 .ok_or_else(|| internal_datafusion_err!("LeftData should be available"))
1729 }
1730
1731 fn maybe_flush_ready_batch(&mut self) -> Option<Poll<Option<Result<RecordBatch>>>> {
1734 if self.output_buffer.has_completed_batch()
1735 && let Some(batch) = self.output_buffer.next_completed_batch()
1736 {
1737 let output_rows = batch.num_rows();
1739 self.metrics.selectivity.add_part(output_rows);
1740
1741 return Some(Poll::Ready(Some(Ok(batch))));
1742 }
1743
1744 None
1745 }
1746
1747 fn update_matched_bitmap(
1763 &mut self,
1764 l_index: usize,
1765 r_matched_bitmap: &BooleanArray,
1766 ) -> Result<()> {
1767 let left_data = self.get_left_data()?;
1768
1769 let joined_len = r_matched_bitmap.true_count();
1771
1772 if need_produce_result_in_final(self.join_type) && (joined_len > 0) {
1774 let mut bitmap = left_data.bitmap().lock();
1775 bitmap.set_bit(l_index, true);
1776 }
1777
1778 if self.should_track_unmatched_right {
1780 debug_assert!(self.current_right_batch_matched.is_some());
1781 let right_bitmap = std::mem::take(&mut self.current_right_batch_matched)
1783 .ok_or_else(|| {
1784 internal_datafusion_err!("right batch's bitmap should be present")
1785 })?;
1786 let (buf, nulls) = right_bitmap.into_parts();
1787 debug_assert!(nulls.is_none());
1788 let updated_right_bitmap = buf.bitor(r_matched_bitmap.values());
1789
1790 self.current_right_batch_matched =
1791 Some(BooleanArray::new(updated_right_bitmap, None));
1792 }
1793
1794 Ok(())
1795 }
1796}
1797
1798fn apply_filter_to_row_join_batch(
1804 left_batch: &RecordBatch,
1805 l_index: usize,
1806 right_batch: &RecordBatch,
1807 filter: &JoinFilter,
1808) -> Result<BooleanArray> {
1809 debug_assert!(left_batch.num_rows() != 0 && right_batch.num_rows() != 0);
1810
1811 let intermediate_batch = if filter.schema.fields().is_empty() {
1812 create_record_batch_with_empty_schema(
1815 Arc::new((*filter.schema).clone()),
1816 right_batch.num_rows(),
1817 )?
1818 } else {
1819 build_row_join_batch(
1820 &filter.schema,
1821 left_batch,
1822 l_index,
1823 right_batch,
1824 None,
1825 &filter.column_indices,
1826 JoinSide::Left,
1827 )?
1828 .ok_or_else(|| internal_datafusion_err!("This function assume input batch is not empty, so the intermediate batch can't be empty too"))?
1829 };
1830
1831 let filter_result = filter
1832 .expression()
1833 .evaluate(&intermediate_batch)?
1834 .into_array(intermediate_batch.num_rows())?;
1835 let filter_arr = as_boolean_array(&filter_result)?;
1836
1837 let bitmap_combined = boolean_mask_from_filter(filter_arr);
1839
1840 Ok(bitmap_combined)
1841}
1842
1843#[inline]
1849fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray {
1850 let (values, nulls) = filter_arr.clone().into_parts();
1851 match nulls {
1852 Some(nulls) => BooleanArray::new(nulls.inner() & &values, None),
1853 None => BooleanArray::new(values, None),
1854 }
1855}
1856
1857fn build_row_join_batch(
1905 output_schema: &Schema,
1906 build_side_batch: &RecordBatch,
1907 build_side_index: usize,
1908 probe_side_batch: &RecordBatch,
1909 probe_side_filter: Option<BooleanArray>,
1910 col_indices: &[ColumnIndex],
1912 build_side: JoinSide,
1915) -> Result<Option<RecordBatch>> {
1916 debug_assert!(build_side != JoinSide::None);
1917
1918 let filtered_probe_batch = if let Some(filter) = probe_side_filter {
1921 &filter_record_batch(probe_side_batch, &filter)?
1922 } else {
1923 probe_side_batch
1924 };
1925
1926 if filtered_probe_batch.num_rows() == 0 {
1927 return Ok(None);
1928 }
1929
1930 if output_schema.fields.is_empty() {
1938 return Ok(Some(create_record_batch_with_empty_schema(
1939 Arc::new(output_schema.clone()),
1940 filtered_probe_batch.num_rows(),
1941 )?));
1942 }
1943
1944 let mut columns: Vec<Arc<dyn Array>> =
1945 Vec::with_capacity(output_schema.fields().len());
1946
1947 for column_index in col_indices {
1948 let array = if column_index.side == build_side {
1949 let original_left_array = build_side_batch.column(column_index.index);
1952 match original_left_array.data_type() {
1958 DataType::List(field) | DataType::LargeList(field)
1959 if field.data_type() == &DataType::Utf8View =>
1960 {
1961 let indices_iter = std::iter::repeat_n(
1962 build_side_index as u64,
1963 filtered_probe_batch.num_rows(),
1964 );
1965 let indices_array = UInt64Array::from_iter_values(indices_iter);
1966 take(original_left_array.as_ref(), &indices_array, None)?
1967 }
1968 _ => {
1969 let scalar_value = ScalarValue::try_from_array(
1970 original_left_array.as_ref(),
1971 build_side_index,
1972 )?;
1973 scalar_value.to_array_of_size(filtered_probe_batch.num_rows())?
1974 }
1975 }
1976 } else {
1977 Arc::clone(filtered_probe_batch.column(column_index.index))
1979 };
1980
1981 columns.push(array);
1982 }
1983
1984 Ok(Some(RecordBatch::try_new(
1985 Arc::new(output_schema.clone()),
1986 columns,
1987 )?))
1988}
1989
1990fn build_unmatched_batch_empty_schema(
1997 output_schema: &SchemaRef,
1998 batch_bitmap: &BooleanArray,
1999 join_type: JoinType,
2001) -> Result<Option<RecordBatch>> {
2002 let result_size = match join_type {
2003 JoinType::Left
2004 | JoinType::Right
2005 | JoinType::Full
2006 | JoinType::LeftAnti
2007 | JoinType::RightAnti => batch_bitmap.false_count(),
2008 JoinType::LeftSemi | JoinType::RightSemi => batch_bitmap.true_count(),
2009 JoinType::LeftMark | JoinType::RightMark => batch_bitmap.len(),
2010 _ => unreachable!(),
2011 };
2012
2013 if output_schema.fields().is_empty() {
2014 Ok(Some(create_record_batch_with_empty_schema(
2015 Arc::clone(output_schema),
2016 result_size,
2017 )?))
2018 } else {
2019 Ok(None)
2020 }
2021}
2022
2023fn create_record_batch_with_empty_schema(
2027 schema: SchemaRef,
2028 row_count: usize,
2029) -> Result<RecordBatch> {
2030 let options = RecordBatchOptions::new()
2031 .with_match_field_names(true)
2032 .with_row_count(Some(row_count));
2033
2034 RecordBatch::try_new_with_options(schema, vec![], &options).map_err(|e| {
2035 internal_datafusion_err!("Failed to create empty record batch: {}", e)
2036 })
2037}
2038
2039fn build_unmatched_batch(
2075 output_schema: &SchemaRef,
2076 batch: &RecordBatch,
2077 batch_bitmap: BooleanArray,
2078 another_side_schema: &SchemaRef,
2080 col_indices: &[ColumnIndex],
2081 join_type: JoinType,
2082 batch_side: JoinSide,
2083) -> Result<Option<RecordBatch>> {
2084 debug_assert_ne!(join_type, JoinType::Inner);
2086 debug_assert_ne!(batch_side, JoinSide::None);
2087
2088 if let Some(batch) =
2090 build_unmatched_batch_empty_schema(output_schema, &batch_bitmap, join_type)?
2091 {
2092 return Ok(Some(batch));
2093 }
2094
2095 match join_type {
2096 JoinType::Full | JoinType::Right | JoinType::Left => {
2097 if join_type == JoinType::Right {
2098 debug_assert_eq!(batch_side, JoinSide::Right);
2099 }
2100 if join_type == JoinType::Left {
2101 debug_assert_eq!(batch_side, JoinSide::Left);
2102 }
2103
2104 let flipped_bitmap = not(&batch_bitmap)?;
2107
2108 let left_null_columns: Vec<Arc<dyn Array>> = another_side_schema
2110 .fields()
2111 .iter()
2112 .map(|field| new_null_array(field.data_type(), 1))
2113 .collect();
2114
2115 let nullable_left_schema = Arc::new(Schema::new(
2119 another_side_schema
2120 .fields()
2121 .iter()
2122 .map(|field| (**field).clone().with_nullable(true))
2123 .collect::<Vec<_>>(),
2124 ));
2125 let left_null_batch = if nullable_left_schema.fields.is_empty() {
2126 create_record_batch_with_empty_schema(nullable_left_schema, 0)?
2129 } else {
2130 RecordBatch::try_new(nullable_left_schema, left_null_columns)?
2131 };
2132
2133 debug_assert_ne!(batch_side, JoinSide::None);
2134 let opposite_side = batch_side.negate();
2135
2136 build_row_join_batch(
2137 output_schema,
2138 &left_null_batch,
2139 0,
2140 batch,
2141 Some(flipped_bitmap),
2142 col_indices,
2143 opposite_side,
2144 )
2145 }
2146 JoinType::RightSemi
2147 | JoinType::RightAnti
2148 | JoinType::LeftSemi
2149 | JoinType::LeftAnti => {
2150 if matches!(join_type, JoinType::RightSemi | JoinType::RightAnti) {
2151 debug_assert_eq!(batch_side, JoinSide::Right);
2152 }
2153 if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
2154 debug_assert_eq!(batch_side, JoinSide::Left);
2155 }
2156
2157 let bitmap = if matches!(join_type, JoinType::LeftSemi | JoinType::RightSemi)
2158 {
2159 batch_bitmap.clone()
2160 } else {
2161 not(&batch_bitmap)?
2162 };
2163
2164 if bitmap.true_count() == 0 {
2165 return Ok(None);
2166 }
2167
2168 let mut columns: Vec<Arc<dyn Array>> =
2169 Vec::with_capacity(output_schema.fields().len());
2170
2171 for column_index in col_indices {
2172 debug_assert!(column_index.side == batch_side);
2173
2174 let col = batch.column(column_index.index);
2175 let filtered_col = filter(col, &bitmap)?;
2176
2177 columns.push(filtered_col);
2178 }
2179
2180 Ok(Some(RecordBatch::try_new(
2181 Arc::clone(output_schema),
2182 columns,
2183 )?))
2184 }
2185 JoinType::RightMark | JoinType::LeftMark => {
2186 if join_type == JoinType::RightMark {
2187 debug_assert_eq!(batch_side, JoinSide::Right);
2188 }
2189 if join_type == JoinType::LeftMark {
2190 debug_assert_eq!(batch_side, JoinSide::Left);
2191 }
2192
2193 let mut columns: Vec<Arc<dyn Array>> =
2194 Vec::with_capacity(output_schema.fields().len());
2195
2196 let mut right_batch_bitmap_opt = Some(batch_bitmap);
2198
2199 for column_index in col_indices {
2200 if column_index.side == batch_side {
2201 let col = batch.column(column_index.index);
2202
2203 columns.push(Arc::clone(col));
2204 } else if column_index.side == JoinSide::None {
2205 let right_batch_bitmap = std::mem::take(&mut right_batch_bitmap_opt);
2206 match right_batch_bitmap {
2207 Some(right_batch_bitmap) => {
2208 columns.push(Arc::new(right_batch_bitmap))
2209 }
2210 None => unreachable!("Should only be one mark column"),
2211 }
2212 } else {
2213 return internal_err!(
2214 "Not possible to have this join side for RightMark join"
2215 );
2216 }
2217 }
2218
2219 Ok(Some(RecordBatch::try_new(
2220 Arc::clone(output_schema),
2221 columns,
2222 )?))
2223 }
2224 _ => internal_err!(
2225 "If batch is at right side, this function must be handling Full/Right/RightSemi/RightAnti/RightMark joins"
2226 ),
2227 }
2228}
2229
2230#[cfg(test)]
2231pub(crate) mod tests {
2232 use super::*;
2233 use crate::test::{TestMemoryExec, assert_join_metrics};
2234 use crate::{
2235 common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
2236 };
2237
2238 use arrow::compute::SortOptions;
2239 use arrow::datatypes::{DataType, Field};
2240 use datafusion_common::test_util::batches_to_sort_string;
2241 use datafusion_common::{ScalarValue, assert_contains};
2242 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
2243 use datafusion_expr::Operator;
2244 use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
2245 use datafusion_physical_expr::{Partitioning, PhysicalExpr};
2246 use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
2247
2248 use insta::allow_duplicates;
2249 use insta::assert_snapshot;
2250 use rstest::rstest;
2251
2252 fn build_table(
2253 a: (&str, &Vec<i32>),
2254 b: (&str, &Vec<i32>),
2255 c: (&str, &Vec<i32>),
2256 batch_size: Option<usize>,
2257 sorted_column_names: Vec<&str>,
2258 ) -> Arc<dyn ExecutionPlan> {
2259 let batch = build_table_i32(a, b, c);
2260 let schema = batch.schema();
2261
2262 let batches = if let Some(batch_size) = batch_size {
2263 let num_batches = batch.num_rows().div_ceil(batch_size);
2264 (0..num_batches)
2265 .map(|i| {
2266 let start = i * batch_size;
2267 let remaining_rows = batch.num_rows() - start;
2268 batch.slice(start, batch_size.min(remaining_rows))
2269 })
2270 .collect::<Vec<_>>()
2271 } else {
2272 vec![batch]
2273 };
2274
2275 let mut sort_info = vec![];
2276 for name in sorted_column_names {
2277 let index = schema.index_of(name).unwrap();
2278 let sort_expr = PhysicalSortExpr::new(
2279 Arc::new(Column::new(name, index)),
2280 SortOptions::new(false, false),
2281 );
2282 sort_info.push(sort_expr);
2283 }
2284 let mut source = TestMemoryExec::try_new(&[batches], schema, None).unwrap();
2285 if let Some(ordering) = LexOrdering::new(sort_info) {
2286 source = source.try_with_sort_information(vec![ordering]).unwrap();
2287 }
2288
2289 let source = Arc::new(source);
2290 Arc::new(TestMemoryExec::update_cache(&source))
2291 }
2292
2293 fn build_left_table() -> Arc<dyn ExecutionPlan> {
2294 build_table(
2295 ("a1", &vec![5, 9, 11]),
2296 ("b1", &vec![5, 8, 8]),
2297 ("c1", &vec![50, 90, 110]),
2298 None,
2299 Vec::new(),
2300 )
2301 }
2302
2303 fn build_right_table() -> Arc<dyn ExecutionPlan> {
2304 build_table(
2305 ("a2", &vec![12, 2, 10]),
2306 ("b2", &vec![10, 2, 10]),
2307 ("c2", &vec![40, 80, 100]),
2308 None,
2309 Vec::new(),
2310 )
2311 }
2312
2313 fn prepare_join_filter() -> JoinFilter {
2314 let column_indices = vec![
2315 ColumnIndex {
2316 index: 1,
2317 side: JoinSide::Left,
2318 },
2319 ColumnIndex {
2320 index: 1,
2321 side: JoinSide::Right,
2322 },
2323 ];
2324 let intermediate_schema = Schema::new(vec![
2325 Field::new("x", DataType::Int32, true),
2326 Field::new("x", DataType::Int32, true),
2327 ]);
2328 let left_filter = Arc::new(BinaryExpr::new(
2330 Arc::new(Column::new("x", 0)),
2331 Operator::NotEq,
2332 Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
2333 )) as Arc<dyn PhysicalExpr>;
2334 let right_filter = Arc::new(BinaryExpr::new(
2336 Arc::new(Column::new("x", 1)),
2337 Operator::NotEq,
2338 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2339 )) as Arc<dyn PhysicalExpr>;
2340 let filter_expression =
2351 Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
2352 as Arc<dyn PhysicalExpr>;
2353
2354 JoinFilter::new(
2355 filter_expression,
2356 column_indices,
2357 Arc::new(intermediate_schema),
2358 )
2359 }
2360
2361 pub(crate) async fn multi_partitioned_join_collect(
2362 left: Arc<dyn ExecutionPlan>,
2363 right: Arc<dyn ExecutionPlan>,
2364 join_type: &JoinType,
2365 join_filter: Option<JoinFilter>,
2366 context: Arc<TaskContext>,
2367 ) -> Result<(Vec<String>, Vec<RecordBatch>, MetricsSet)> {
2368 let partition_count = 4;
2369
2370 let right = Arc::new(RepartitionExec::try_new(
2372 right,
2373 Partitioning::RoundRobinBatch(partition_count),
2374 )?) as Arc<dyn ExecutionPlan>;
2375
2376 let nested_loop_join =
2378 NestedLoopJoinExec::try_new(left, right, join_filter, join_type, None)?;
2379 let columns = columns(&nested_loop_join.schema());
2380 let mut batches = vec![];
2381 for i in 0..partition_count {
2382 let stream = nested_loop_join.execute(i, Arc::clone(&context))?;
2383 let more_batches = common::collect(stream).await?;
2384 batches.extend(
2385 more_batches
2386 .into_iter()
2387 .inspect(|b| {
2388 assert!(b.num_rows() <= context.session_config().batch_size())
2389 })
2390 .filter(|b| b.num_rows() > 0)
2391 .collect::<Vec<_>>(),
2392 );
2393 }
2394
2395 let metrics = nested_loop_join.metrics().unwrap();
2396
2397 Ok((columns, batches, metrics))
2398 }
2399
2400 fn new_task_ctx(batch_size: usize) -> Arc<TaskContext> {
2401 let base = TaskContext::default();
2402 let cfg = base.session_config().clone().with_batch_size(batch_size);
2404 Arc::new(base.with_session_config(cfg))
2405 }
2406
2407 #[rstest]
2408 #[tokio::test]
2409 async fn join_inner_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2410 let task_ctx = new_task_ctx(batch_size);
2411 dbg!(&batch_size);
2412 let left = build_left_table();
2413 let right = build_right_table();
2414 let filter = prepare_join_filter();
2415 let (columns, batches, metrics) = multi_partitioned_join_collect(
2416 left,
2417 right,
2418 &JoinType::Inner,
2419 Some(filter),
2420 task_ctx,
2421 )
2422 .await?;
2423
2424 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2425 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2426 +----+----+----+----+----+----+
2427 | a1 | b1 | c1 | a2 | b2 | c2 |
2428 +----+----+----+----+----+----+
2429 | 5 | 5 | 50 | 2 | 2 | 80 |
2430 +----+----+----+----+----+----+
2431 "));
2432
2433 assert_join_metrics!(metrics, 1);
2434
2435 Ok(())
2436 }
2437
2438 #[rstest]
2439 #[tokio::test]
2440 async fn join_left_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2441 let task_ctx = new_task_ctx(batch_size);
2442 let left = build_left_table();
2443 let right = build_right_table();
2444
2445 let filter = prepare_join_filter();
2446 let (columns, batches, metrics) = multi_partitioned_join_collect(
2447 left,
2448 right,
2449 &JoinType::Left,
2450 Some(filter),
2451 task_ctx,
2452 )
2453 .await?;
2454 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2455 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2456 +----+----+-----+----+----+----+
2457 | a1 | b1 | c1 | a2 | b2 | c2 |
2458 +----+----+-----+----+----+----+
2459 | 11 | 8 | 110 | | | |
2460 | 5 | 5 | 50 | 2 | 2 | 80 |
2461 | 9 | 8 | 90 | | | |
2462 +----+----+-----+----+----+----+
2463 "));
2464
2465 assert_join_metrics!(metrics, 3);
2466
2467 Ok(())
2468 }
2469
2470 #[rstest]
2471 #[tokio::test]
2472 async fn join_right_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2473 let task_ctx = new_task_ctx(batch_size);
2474 let left = build_left_table();
2475 let right = build_right_table();
2476
2477 let filter = prepare_join_filter();
2478 let (columns, batches, metrics) = multi_partitioned_join_collect(
2479 left,
2480 right,
2481 &JoinType::Right,
2482 Some(filter),
2483 task_ctx,
2484 )
2485 .await?;
2486 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2487 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2488 +----+----+----+----+----+-----+
2489 | a1 | b1 | c1 | a2 | b2 | c2 |
2490 +----+----+----+----+----+-----+
2491 | | | | 10 | 10 | 100 |
2492 | | | | 12 | 10 | 40 |
2493 | 5 | 5 | 50 | 2 | 2 | 80 |
2494 +----+----+----+----+----+-----+
2495 "));
2496
2497 assert_join_metrics!(metrics, 3);
2498
2499 Ok(())
2500 }
2501
2502 #[rstest]
2503 #[tokio::test]
2504 async fn join_full_with_filter(#[values(1, 2, 16)] batch_size: usize) -> Result<()> {
2505 let task_ctx = new_task_ctx(batch_size);
2506 let left = build_left_table();
2507 let right = build_right_table();
2508
2509 let filter = prepare_join_filter();
2510 let (columns, batches, metrics) = multi_partitioned_join_collect(
2511 left,
2512 right,
2513 &JoinType::Full,
2514 Some(filter),
2515 task_ctx,
2516 )
2517 .await?;
2518 assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
2519 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2520 +----+----+-----+----+----+-----+
2521 | a1 | b1 | c1 | a2 | b2 | c2 |
2522 +----+----+-----+----+----+-----+
2523 | | | | 10 | 10 | 100 |
2524 | | | | 12 | 10 | 40 |
2525 | 11 | 8 | 110 | | | |
2526 | 5 | 5 | 50 | 2 | 2 | 80 |
2527 | 9 | 8 | 90 | | | |
2528 +----+----+-----+----+----+-----+
2529 "));
2530
2531 assert_join_metrics!(metrics, 5);
2532
2533 Ok(())
2534 }
2535
2536 #[rstest]
2537 #[tokio::test]
2538 async fn join_left_semi_with_filter(
2539 #[values(1, 2, 16)] batch_size: usize,
2540 ) -> Result<()> {
2541 let task_ctx = new_task_ctx(batch_size);
2542 let left = build_left_table();
2543 let right = build_right_table();
2544
2545 let filter = prepare_join_filter();
2546 let (columns, batches, metrics) = multi_partitioned_join_collect(
2547 left,
2548 right,
2549 &JoinType::LeftSemi,
2550 Some(filter),
2551 task_ctx,
2552 )
2553 .await?;
2554 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2555 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2556 +----+----+----+
2557 | a1 | b1 | c1 |
2558 +----+----+----+
2559 | 5 | 5 | 50 |
2560 +----+----+----+
2561 "));
2562
2563 assert_join_metrics!(metrics, 1);
2564
2565 Ok(())
2566 }
2567
2568 #[rstest]
2569 #[tokio::test]
2570 async fn join_left_anti_with_filter(
2571 #[values(1, 2, 16)] batch_size: usize,
2572 ) -> Result<()> {
2573 let task_ctx = new_task_ctx(batch_size);
2574 let left = build_left_table();
2575 let right = build_right_table();
2576
2577 let filter = prepare_join_filter();
2578 let (columns, batches, metrics) = multi_partitioned_join_collect(
2579 left,
2580 right,
2581 &JoinType::LeftAnti,
2582 Some(filter),
2583 task_ctx,
2584 )
2585 .await?;
2586 assert_eq!(columns, vec!["a1", "b1", "c1"]);
2587 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2588 +----+----+-----+
2589 | a1 | b1 | c1 |
2590 +----+----+-----+
2591 | 11 | 8 | 110 |
2592 | 9 | 8 | 90 |
2593 +----+----+-----+
2594 "));
2595
2596 assert_join_metrics!(metrics, 2);
2597
2598 Ok(())
2599 }
2600
2601 #[tokio::test]
2602 async fn join_has_correct_stats() -> Result<()> {
2603 let left = build_left_table();
2604 let right = build_right_table();
2605 let nested_loop_join = NestedLoopJoinExec::try_new(
2606 left,
2607 right,
2608 None,
2609 &JoinType::Left,
2610 Some(vec![1, 2]),
2611 )?;
2612 let stats = nested_loop_join.partition_statistics(None)?;
2613 assert_eq!(
2614 nested_loop_join.schema().fields().len(),
2615 stats.column_statistics.len(),
2616 );
2617 assert_eq!(2, stats.column_statistics.len());
2618 Ok(())
2619 }
2620
2621 #[rstest]
2622 #[tokio::test]
2623 async fn join_right_semi_with_filter(
2624 #[values(1, 2, 16)] batch_size: usize,
2625 ) -> Result<()> {
2626 let task_ctx = new_task_ctx(batch_size);
2627 let left = build_left_table();
2628 let right = build_right_table();
2629
2630 let filter = prepare_join_filter();
2631 let (columns, batches, metrics) = multi_partitioned_join_collect(
2632 left,
2633 right,
2634 &JoinType::RightSemi,
2635 Some(filter),
2636 task_ctx,
2637 )
2638 .await?;
2639 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2640 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2641 +----+----+----+
2642 | a2 | b2 | c2 |
2643 +----+----+----+
2644 | 2 | 2 | 80 |
2645 +----+----+----+
2646 "));
2647
2648 assert_join_metrics!(metrics, 1);
2649
2650 Ok(())
2651 }
2652
2653 #[rstest]
2654 #[tokio::test]
2655 async fn join_right_anti_with_filter(
2656 #[values(1, 2, 16)] batch_size: usize,
2657 ) -> Result<()> {
2658 let task_ctx = new_task_ctx(batch_size);
2659 let left = build_left_table();
2660 let right = build_right_table();
2661
2662 let filter = prepare_join_filter();
2663 let (columns, batches, metrics) = multi_partitioned_join_collect(
2664 left,
2665 right,
2666 &JoinType::RightAnti,
2667 Some(filter),
2668 task_ctx,
2669 )
2670 .await?;
2671 assert_eq!(columns, vec!["a2", "b2", "c2"]);
2672 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2673 +----+----+-----+
2674 | a2 | b2 | c2 |
2675 +----+----+-----+
2676 | 10 | 10 | 100 |
2677 | 12 | 10 | 40 |
2678 +----+----+-----+
2679 "));
2680
2681 assert_join_metrics!(metrics, 2);
2682
2683 Ok(())
2684 }
2685
2686 #[rstest]
2687 #[tokio::test]
2688 async fn join_left_mark_with_filter(
2689 #[values(1, 2, 16)] batch_size: usize,
2690 ) -> Result<()> {
2691 let task_ctx = new_task_ctx(batch_size);
2692 let left = build_left_table();
2693 let right = build_right_table();
2694
2695 let filter = prepare_join_filter();
2696 let (columns, batches, metrics) = multi_partitioned_join_collect(
2697 left,
2698 right,
2699 &JoinType::LeftMark,
2700 Some(filter),
2701 task_ctx,
2702 )
2703 .await?;
2704 assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]);
2705 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2706 +----+----+-----+-------+
2707 | a1 | b1 | c1 | mark |
2708 +----+----+-----+-------+
2709 | 11 | 8 | 110 | false |
2710 | 5 | 5 | 50 | true |
2711 | 9 | 8 | 90 | false |
2712 +----+----+-----+-------+
2713 "));
2714
2715 assert_join_metrics!(metrics, 3);
2716
2717 Ok(())
2718 }
2719
2720 #[rstest]
2721 #[tokio::test]
2722 async fn join_right_mark_with_filter(
2723 #[values(1, 2, 16)] batch_size: usize,
2724 ) -> Result<()> {
2725 let task_ctx = new_task_ctx(batch_size);
2726 let left = build_left_table();
2727 let right = build_right_table();
2728
2729 let filter = prepare_join_filter();
2730 let (columns, batches, metrics) = multi_partitioned_join_collect(
2731 left,
2732 right,
2733 &JoinType::RightMark,
2734 Some(filter),
2735 task_ctx,
2736 )
2737 .await?;
2738 assert_eq!(columns, vec!["a2", "b2", "c2", "mark"]);
2739
2740 allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r"
2741 +----+----+-----+-------+
2742 | a2 | b2 | c2 | mark |
2743 +----+----+-----+-------+
2744 | 10 | 10 | 100 | false |
2745 | 12 | 10 | 40 | false |
2746 | 2 | 2 | 80 | true |
2747 +----+----+-----+-------+
2748 "));
2749
2750 assert_join_metrics!(metrics, 3);
2751
2752 Ok(())
2753 }
2754
2755 #[tokio::test]
2756 async fn test_overallocation() -> Result<()> {
2757 let left = build_table(
2758 ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2759 ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2760 ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
2761 None,
2762 Vec::new(),
2763 );
2764 let right = build_table(
2765 ("a2", &vec![10, 11]),
2766 ("b2", &vec![12, 13]),
2767 ("c2", &vec![14, 15]),
2768 None,
2769 Vec::new(),
2770 );
2771 let filter = prepare_join_filter();
2772
2773 let join_types = vec![
2774 JoinType::Inner,
2775 JoinType::Left,
2776 JoinType::Right,
2777 JoinType::Full,
2778 JoinType::LeftSemi,
2779 JoinType::LeftAnti,
2780 JoinType::LeftMark,
2781 JoinType::RightSemi,
2782 JoinType::RightAnti,
2783 JoinType::RightMark,
2784 ];
2785
2786 for join_type in join_types {
2787 let runtime = RuntimeEnvBuilder::new()
2788 .with_memory_limit(100, 1.0)
2789 .build_arc()?;
2790 let task_ctx = TaskContext::default().with_runtime(runtime);
2791 let task_ctx = Arc::new(task_ctx);
2792
2793 let err = multi_partitioned_join_collect(
2794 Arc::clone(&left),
2795 Arc::clone(&right),
2796 &join_type,
2797 Some(filter.clone()),
2798 task_ctx,
2799 )
2800 .await
2801 .unwrap_err();
2802
2803 assert_contains!(
2804 err.to_string(),
2805 "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]"
2806 );
2807 }
2808
2809 Ok(())
2810 }
2811
2812 fn columns(schema: &Schema) -> Vec<String> {
2814 schema.fields().iter().map(|f| f.name().clone()).collect()
2815 }
2816}