use std::sync::Arc;
use crate::common::{CompactVec, I64Map};
use crate::core::value::NULL_VALUE;
use crate::core::{Result, Row, RowVec, Value};
use crate::executor::expression::JoinFilter;
use crate::executor::operator::{ColumnInfo, Operator, RowRef};
use crate::storage::expression::ConstBoolExpr;
use crate::storage::traits::{Index, Table};
use super::hash_join::JoinType;
#[derive(Clone)]
pub enum IndexLookupStrategy {
SecondaryIndex(Arc<dyn Index>),
PrimaryKey,
}
#[derive(Clone, Copy)]
pub enum ColumnSource {
Outer(usize),
Inner(usize),
}
#[derive(Clone)]
pub struct JoinProjection {
pub columns: Vec<ColumnSource>,
}
pub struct IndexNestedLoopJoinOperator {
outer: Box<dyn Operator>,
inner_table: Box<dyn Table>,
join_type: JoinType,
outer_key_idx: usize,
lookup_strategy: IndexLookupStrategy,
residual_filter: Option<JoinFilter>,
schema: Vec<ColumnInfo>,
inner_col_count: usize,
projection: Option<JoinProjection>,
current_outer_row: Option<Row>,
current_inner_rows: RowVec,
current_inner_idx: usize,
outer_had_match: bool,
row_id_buffer: Vec<i64>,
row_buffer: Row,
true_expr: ConstBoolExpr,
opened: bool,
outer_exhausted: bool,
}
impl IndexNestedLoopJoinOperator {
pub fn new(
outer: Box<dyn Operator>,
inner_table: Box<dyn Table>,
inner_schema: Vec<ColumnInfo>,
join_type: JoinType,
outer_key_idx: usize,
lookup_strategy: IndexLookupStrategy,
residual_filter: Option<JoinFilter>,
) -> Self {
let mut schema = Vec::new();
schema.extend(outer.schema().iter().cloned());
schema.extend(inner_schema.iter().cloned());
let inner_col_count = inner_schema.len();
let outer_col_count = outer.schema().len();
let total_cols = outer_col_count + inner_col_count;
Self {
outer,
inner_table,
join_type,
outer_key_idx,
lookup_strategy,
residual_filter,
schema,
inner_col_count,
projection: None,
current_outer_row: None,
current_inner_rows: RowVec::new(),
current_inner_idx: 0,
outer_had_match: false,
row_id_buffer: Vec::with_capacity(16),
row_buffer: Row::with_capacity(total_cols),
true_expr: ConstBoolExpr::true_expr(),
opened: false,
outer_exhausted: false,
}
}
pub fn with_projection(
mut self,
columns: Vec<ColumnSource>,
projected_schema: Vec<ColumnInfo>,
) -> Self {
self.projection = Some(JoinProjection { columns });
self.schema = projected_schema;
self
}
#[inline]
fn null_inner_row(&self) -> Row {
let null_values: Vec<Value> = (0..self.inner_col_count).map(|_| NULL_VALUE).collect();
Row::from_values(null_values)
}
#[inline]
fn combine_owned_into_buffer(&mut self, outer: Row, inner: Row) {
match &self.projection {
Some(proj) => {
self.row_buffer.clear();
self.row_buffer.reserve(proj.columns.len());
let mut outer_values = outer.into_values();
let mut inner_values = inner.into_values();
for col_source in &proj.columns {
match col_source {
ColumnSource::Outer(idx) => {
self.row_buffer.push(std::mem::take(unsafe {
outer_values.get_unchecked_mut(*idx)
}));
}
ColumnSource::Inner(idx) => {
self.row_buffer.push(std::mem::take(unsafe {
inner_values.get_unchecked_mut(*idx)
}));
}
}
}
}
None => {
self.row_buffer.combine_into_owned(outer, inner);
}
}
}
#[inline]
fn take_from_buffer(&mut self) -> Row {
self.row_buffer.take_and_clear()
}
#[inline]
fn create_combined_row(&self, outer: &Row, inner: Row) -> Row {
match &self.projection {
Some(proj) => {
let mut values: CompactVec<Value> = CompactVec::with_capacity(proj.columns.len());
let outer_slice = outer.as_slice();
let mut inner_values = inner.into_values();
for col_source in &proj.columns {
match col_source {
ColumnSource::Outer(idx) => {
values.push(unsafe { outer_slice.get_unchecked(*idx) }.clone());
}
ColumnSource::Inner(idx) => {
values.push(std::mem::take(unsafe {
inner_values.get_unchecked_mut(*idx)
}));
}
}
}
Row::from_compact_vec(values)
}
None => Row::from_combined_clone_move(outer, inner),
}
}
fn lookup_inner_rows(&mut self, key_value: &Value) {
self.row_id_buffer.clear();
self.current_inner_rows.clear();
match &self.lookup_strategy {
IndexLookupStrategy::SecondaryIndex(index) => {
index.get_row_ids_equal_into(
std::slice::from_ref(key_value),
&mut self.row_id_buffer,
);
}
IndexLookupStrategy::PrimaryKey => {
match key_value {
Value::Integer(id) => self.row_id_buffer.push(*id),
Value::Float(f) => self.row_id_buffer.push(*f as i64),
_ => {} }
}
}
if self.row_id_buffer.is_empty() {
return;
}
self.inner_table.fetch_rows_by_ids_into(
&self.row_id_buffer,
&self.true_expr,
&mut self.current_inner_rows,
);
}
fn advance_outer(&mut self) -> Result<bool> {
match self.outer.next()? {
Some(row_ref) => {
let outer_row = row_ref.into_owned();
let key_value = match outer_row.get(self.outer_key_idx) {
Some(v) if !v.is_null() => v.clone(),
_ => {
self.current_outer_row = Some(outer_row);
self.current_inner_rows.clear();
self.current_inner_idx = 0;
self.outer_had_match = false;
return Ok(true);
}
};
self.lookup_inner_rows(&key_value);
self.current_outer_row = Some(outer_row);
self.current_inner_idx = 0;
self.outer_had_match = false;
Ok(true)
}
None => {
self.outer_exhausted = true;
Ok(false)
}
}
}
}
impl Operator for IndexNestedLoopJoinOperator {
fn open(&mut self) -> Result<()> {
self.outer.open()?;
self.advance_outer()?;
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
if !self.opened {
return Err(crate::core::Error::internal(
"IndexNestedLoopJoinOperator::next called before open",
));
}
let is_left_outer = matches!(self.join_type, JoinType::Left | JoinType::Full);
loop {
if self.outer_exhausted {
return Ok(None);
}
if self.current_outer_row.is_none() && !self.advance_outer()? {
return Ok(None);
}
let inner_len = self.current_inner_rows.len();
while self.current_inner_idx < inner_len {
let inner_idx = self.current_inner_idx;
self.current_inner_idx += 1;
let passes_filter = {
let outer_row = self.current_outer_row.as_ref().unwrap();
let inner_entry = unsafe { self.current_inner_rows.get_unchecked(inner_idx) };
if let Some(ref filter) = self.residual_filter {
filter.matches(outer_row, &inner_entry.1)
} else {
true
}
};
if passes_filter {
self.outer_had_match = true;
let inner_row = std::mem::take(unsafe {
&mut self.current_inner_rows.get_unchecked_mut(inner_idx).1
});
let is_last_inner = self.current_inner_idx >= inner_len;
if is_last_inner {
let outer_row = self.current_outer_row.take().unwrap();
self.advance_outer()?;
self.combine_owned_into_buffer(outer_row, inner_row);
return Ok(Some(RowRef::Owned(self.take_from_buffer())));
} else {
let outer_row = self.current_outer_row.as_ref().unwrap();
let combined = self.create_combined_row(outer_row, inner_row);
return Ok(Some(RowRef::Owned(combined)));
}
}
}
if is_left_outer && !self.outer_had_match {
let outer_row = self.current_outer_row.take().unwrap();
self.advance_outer()?;
let null_inner = self.null_inner_row();
self.combine_owned_into_buffer(outer_row, null_inner);
return Ok(Some(RowRef::Owned(self.take_from_buffer())));
}
if !self.advance_outer()? {
return Ok(None);
}
}
}
fn close(&mut self) -> Result<()> {
self.outer.close()
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn estimated_rows(&self) -> Option<usize> {
let outer_est = self.outer.estimated_rows()?;
Some(match self.join_type {
JoinType::Inner => outer_est, JoinType::Left | JoinType::Full => outer_est,
_ => outer_est,
})
}
fn name(&self) -> &str {
match self.join_type {
JoinType::Inner => "IndexNL (INNER)",
JoinType::Left => "IndexNL (LEFT)",
_ => "IndexNL",
}
}
}
pub struct BatchIndexNestedLoopJoinOperator {
outer: Box<dyn Operator>,
inner_table: Box<dyn Table>,
join_type: JoinType,
outer_key_idx: usize,
lookup_strategy: IndexLookupStrategy,
residual_filter: Option<JoinFilter>,
schema: Vec<ColumnInfo>,
inner_col_count: usize,
projection: Option<JoinProjection>,
results: Vec<Row>,
result_idx: usize,
row_id_buffer: Vec<i64>,
opened: bool,
}
impl BatchIndexNestedLoopJoinOperator {
pub fn new(
outer: Box<dyn Operator>,
inner_table: Box<dyn Table>,
inner_schema: Vec<ColumnInfo>,
join_type: JoinType,
outer_key_idx: usize,
lookup_strategy: IndexLookupStrategy,
residual_filter: Option<JoinFilter>,
) -> Self {
let mut schema = Vec::new();
schema.extend(outer.schema().iter().cloned());
schema.extend(inner_schema.iter().cloned());
let inner_col_count = inner_schema.len();
Self {
outer,
inner_table,
join_type,
outer_key_idx,
lookup_strategy,
residual_filter,
schema,
inner_col_count,
projection: None,
results: Vec::new(),
result_idx: 0,
row_id_buffer: Vec::with_capacity(16),
opened: false,
}
}
pub fn with_projection(
mut self,
columns: Vec<ColumnSource>,
projected_schema: Vec<ColumnInfo>,
) -> Self {
self.projection = Some(JoinProjection { columns });
self.schema = projected_schema;
self
}
#[inline]
fn combine(&self, outer: &Row, inner: &Row) -> Row {
match &self.projection {
Some(proj) => {
let mut values: CompactVec<Value> = CompactVec::with_capacity(proj.columns.len());
let outer_slice = outer.as_slice();
let inner_slice = inner.as_slice();
for col_source in &proj.columns {
match col_source {
ColumnSource::Outer(idx) => {
values.push(unsafe { outer_slice.get_unchecked(*idx) }.clone());
}
ColumnSource::Inner(idx) => {
values.push(unsafe { inner_slice.get_unchecked(*idx) }.clone());
}
}
}
Row::from_compact_vec(values)
}
None => Row::from_combined(outer, inner),
}
}
#[inline]
fn null_inner_row(&self) -> Row {
let null_values: Vec<Value> = (0..self.inner_col_count).map(|_| NULL_VALUE).collect();
Row::from_values(null_values)
}
}
impl Operator for BatchIndexNestedLoopJoinOperator {
fn open(&mut self) -> Result<()> {
use crate::common::I64Set;
self.outer.open()?;
let is_left_join = matches!(self.join_type, JoinType::Left | JoinType::Full);
let true_expr = ConstBoolExpr::true_expr();
let mut outer_rows: Vec<Row> = Vec::new();
let mut row_id_set: I64Set = I64Set::new();
let mut key_to_outer_indices: I64Map<Vec<usize>> = I64Map::new();
while let Some(row_ref) = self.outer.next()? {
let outer_row = row_ref.into_owned();
let outer_idx = outer_rows.len();
let key_value = match outer_row.get(self.outer_key_idx) {
Some(v) if !v.is_null() => v.clone(),
_ => {
outer_rows.push(outer_row);
continue;
}
};
self.row_id_buffer.clear();
match &self.lookup_strategy {
IndexLookupStrategy::SecondaryIndex(index) => {
index.get_row_ids_equal_into(
std::slice::from_ref(&key_value),
&mut self.row_id_buffer,
);
}
IndexLookupStrategy::PrimaryKey => match &key_value {
Value::Integer(id) => self.row_id_buffer.push(*id),
Value::Float(f) => self.row_id_buffer.push(*f as i64),
_ => {}
},
}
for &row_id in &self.row_id_buffer {
key_to_outer_indices
.entry(row_id)
.or_default()
.push(outer_idx);
row_id_set.insert(row_id);
}
outer_rows.push(outer_row);
}
if outer_rows.is_empty() {
self.opened = true;
return Ok(());
}
let all_row_ids: Vec<i64> = row_id_set.into_iter().collect();
let inner_rows_batch = self.inner_table.fetch_rows_by_ids(&all_row_ids, &true_expr);
let mut inner_by_id: I64Map<Row> = I64Map::with_capacity(inner_rows_batch.len());
for (row_id, row) in inner_rows_batch {
inner_by_id.insert(row_id, row);
}
let mut matched_outers: Vec<bool> = vec![false; outer_rows.len()];
for (row_id, inner_row) in inner_by_id.iter() {
if let Some(outer_indices) = key_to_outer_indices.get(row_id) {
for &outer_idx in outer_indices {
let outer_row = &outer_rows[outer_idx];
let passes_filter = if let Some(ref filter) = self.residual_filter {
filter.matches(outer_row, inner_row)
} else {
true
};
if passes_filter {
matched_outers[outer_idx] = true;
self.results.push(self.combine(outer_row, inner_row));
}
}
}
}
if is_left_join {
let null_inner = self.null_inner_row();
for (idx, outer_row) in outer_rows.iter().enumerate() {
if !matched_outers[idx] {
self.results.push(self.combine(outer_row, &null_inner));
}
}
}
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
if !self.opened {
return Err(crate::core::Error::internal(
"BatchIndexNestedLoopJoinOperator::next called before open",
));
}
if self.result_idx < self.results.len() {
let row = std::mem::take(&mut self.results[self.result_idx]);
self.result_idx += 1;
Ok(Some(RowRef::Owned(row)))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.outer.close()
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn estimated_rows(&self) -> Option<usize> {
self.outer.estimated_rows()
}
fn name(&self) -> &str {
"BatchIndexNL"
}
}