use crate::joins::utils::ColumnIndex;
use arrow::datatypes::SchemaRef;
use datafusion_common::JoinSide;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::{fmt::Display, sync::Arc};
#[derive(Debug, Clone)]
pub struct JoinFilter {
pub(crate) expression: Arc<dyn PhysicalExpr>,
pub(crate) column_indices: Vec<ColumnIndex>,
pub(crate) schema: SchemaRef,
}
impl Display for JoinFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.expression.fmt_sql(f)
}
}
impl JoinFilter {
pub fn new(
expression: Arc<dyn PhysicalExpr>,
column_indices: Vec<ColumnIndex>,
schema: SchemaRef,
) -> JoinFilter {
JoinFilter {
expression,
column_indices,
schema,
}
}
pub fn build_column_indices(
left_indices: Vec<usize>,
right_indices: Vec<usize>,
) -> Vec<ColumnIndex> {
left_indices
.into_iter()
.map(|i| ColumnIndex {
index: i,
side: JoinSide::Left,
})
.chain(right_indices.into_iter().map(|i| ColumnIndex {
index: i,
side: JoinSide::Right,
}))
.collect()
}
pub fn expression(&self) -> &Arc<dyn PhysicalExpr> {
&self.expression
}
pub fn column_indices(&self) -> &[ColumnIndex] {
&self.column_indices
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn swap(&self) -> JoinFilter {
let column_indices = self
.column_indices()
.iter()
.map(|idx| ColumnIndex {
index: idx.index,
side: idx.side.negate(),
})
.collect();
JoinFilter::new(
Arc::clone(self.expression()),
column_indices,
Arc::clone(self.schema()),
)
}
}