use reifydb_core::{common::CommitVersion, interface::change::Diff, value::column::columns::Columns};
use reifydb_runtime::hash::Hash128;
use reifydb_type::Result;
use super::hash::{
add_to_state_entry_batch, emit_joined_columns_batch, emit_remove_joined_columns_batch,
emit_update_joined_columns, remove_from_state_entry, update_row_in_entry,
};
use crate::{
operator::join::{
operator::JoinOperator,
state::{JoinSide, JoinState},
},
transaction::FlowTransaction,
};
pub(crate) struct InnerHashJoin;
impl InnerHashJoin {
pub(crate) fn handle_insert_undefined(
&self,
_txn: &mut FlowTransaction,
_post: &Columns,
_row_idx: usize,
_side: JoinSide,
_state: &mut JoinState,
_operator: &JoinOperator,
) -> Result<Vec<Diff>> {
Ok(Vec::new())
}
pub(crate) fn handle_remove_undefined(
&self,
_txn: &mut FlowTransaction,
_pre: &Columns,
_row_idx: usize,
_side: JoinSide,
_state: &mut JoinState,
_operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
Ok(Vec::new())
}
pub(crate) fn handle_update_undefined(
&self,
_txn: &mut FlowTransaction,
_pre: &Columns,
_post: &Columns,
_row_idx: usize,
_side: JoinSide,
_state: &mut JoinState,
_operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
Ok(Vec::new())
}
pub(crate) fn handle_insert(
&self,
txn: &mut FlowTransaction,
post: &Columns,
indices: &[usize],
side: JoinSide,
key_hash: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
match side {
JoinSide::Left => {
add_to_state_entry_batch(txn, &mut state.left, key_hash, post, indices)?;
}
JoinSide::Right => {
add_to_state_entry_batch(txn, &mut state.right, key_hash, post, indices)?;
}
}
let opposite_store = match side {
JoinSide::Left => &state.right,
JoinSide::Right => &state.left,
};
let opposite_parent = match side {
JoinSide::Left => &operator.right_parent,
JoinSide::Right => &operator.left_parent,
};
if let Some(diff) = emit_joined_columns_batch(
txn,
post,
indices,
side,
opposite_store,
key_hash,
operator,
opposite_parent,
)? {
result.push(diff);
}
Ok(result)
}
pub(crate) fn handle_remove(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
indices: &[usize],
side: JoinSide,
key_hash: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
if matches!(side, JoinSide::Left) {
for &idx in indices {
let row_number = pre.row_numbers[idx];
operator.cleanup_left_row_joins(txn, *row_number)?;
}
}
let opposite_store = match side {
JoinSide::Left => &state.right,
JoinSide::Right => &state.left,
};
let opposite_parent = match side {
JoinSide::Left => &operator.right_parent,
JoinSide::Right => &operator.left_parent,
};
if let Some(diff) = emit_remove_joined_columns_batch(
txn,
pre,
indices,
side,
opposite_store,
key_hash,
operator,
opposite_parent,
)? {
result.push(diff);
}
for &idx in indices {
let row_number = pre.row_numbers[idx];
match side {
JoinSide::Left => {
remove_from_state_entry(txn, &mut state.left, key_hash, row_number)?;
}
JoinSide::Right => {
remove_from_state_entry(txn, &mut state.right, key_hash, row_number)?;
}
}
}
Ok(result)
}
pub(crate) fn handle_update(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
post: &Columns,
indices: &[usize],
side: JoinSide,
pre_key: &Hash128,
post_key: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
version: CommitVersion,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
if pre_key == post_key {
for &row_idx in indices {
let pre_row_number = pre.row_numbers[row_idx];
let post_row_number = post.row_numbers[row_idx];
match side {
JoinSide::Left => {
if update_row_in_entry(
txn,
&mut state.left,
pre_key,
pre_row_number,
post_row_number,
)? {
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Left,
&state.right,
pre_key,
operator,
&operator.right_parent,
)? {
result.push(diff);
}
}
}
JoinSide::Right => {
if update_row_in_entry(
txn,
&mut state.right,
pre_key,
pre_row_number,
post_row_number,
)? {
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Right,
&state.left,
pre_key,
operator,
&operator.left_parent,
)? {
result.push(diff);
}
}
}
}
}
} else {
let remove_diffs =
self.handle_remove(txn, pre, indices, side, pre_key, state, operator, version)?;
result.extend(remove_diffs);
let insert_diffs = self.handle_insert(txn, post, indices, side, post_key, state, operator)?;
result.extend(insert_diffs);
}
Ok(result)
}
}