use reifydb_core::{common::CommitVersion, interface::change::Diff, value::column::columns::Columns};
use reifydb_runtime::hash::Hash128;
use reifydb_type::Result;
use super::hash::{
add_to_state_entry_batch, emit_joined_columns_batch, emit_remove_joined_columns_batch,
emit_update_joined_columns, is_first_right_row, pull_left_columns, remove_from_state_entry,
update_row_in_entry,
};
use crate::{
operator::join::{
operator::JoinOperator,
state::{JoinSide, JoinState},
},
transaction::FlowTransaction,
};
pub(crate) struct LeftHashJoin;
impl LeftHashJoin {
pub(crate) fn handle_insert_undefined(
&self,
txn: &mut FlowTransaction,
post: &Columns,
row_idx: usize,
side: JoinSide,
_state: &mut JoinState,
operator: &JoinOperator,
) -> Result<Vec<Diff>> {
match side {
JoinSide::Left => {
let unmatched = operator.unmatched_left_columns(txn, post, row_idx)?;
Ok(vec![Diff::Insert {
post: unmatched,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_remove_undefined(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
row_idx: usize,
side: JoinSide,
_state: &mut JoinState,
operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
let row_number = pre.row_numbers[row_idx];
match side {
JoinSide::Left => {
let unmatched = operator.unmatched_left_columns(txn, pre, row_idx)?;
operator.cleanup_left_row_joins(txn, *row_number)?;
Ok(vec![Diff::Remove {
pre: unmatched,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_update_undefined(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
post: &Columns,
row_idx: usize,
side: JoinSide,
_state: &mut JoinState,
operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
match side {
JoinSide::Left => {
let unmatched_pre = operator.unmatched_left_columns(txn, pre, row_idx)?;
let unmatched_post = operator.unmatched_left_columns(txn, post, row_idx)?;
Ok(vec![Diff::Update {
pre: unmatched_pre,
post: unmatched_post,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_insert(
&self,
txn: &mut FlowTransaction,
post: &Columns,
indices: &[usize],
side: JoinSide,
key_hash: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
match side {
JoinSide::Left => {
add_to_state_entry_batch(txn, &mut state.left, key_hash, post, indices)?;
if let Some(diff) = emit_joined_columns_batch(
txn,
post,
indices,
JoinSide::Left,
&state.right,
key_hash,
operator,
&operator.right_parent,
)? {
result.push(diff);
} else {
let unmatched = operator.unmatched_left_columns_batch(txn, post, indices)?;
result.push(Diff::Insert {
post: unmatched,
});
}
}
JoinSide::Right => {
let is_first = is_first_right_row(txn, &state.right, key_hash)?;
add_to_state_entry_batch(txn, &mut state.right, key_hash, post, indices)?;
if is_first {
if let Some(left_entry) = state.left.get(txn, key_hash)? {
let left_columns = operator.left_parent.pull(txn, &left_entry.rows)?;
let left_indices: Vec<usize> = (0..left_columns.row_count()).collect();
let unmatched = operator.unmatched_left_columns_batch(
txn,
&left_columns,
&left_indices,
)?;
result.push(Diff::Remove {
pre: unmatched,
});
}
}
if let Some(diff) = emit_joined_columns_batch(
txn,
post,
indices,
JoinSide::Right,
&state.left,
key_hash,
operator,
&operator.left_parent,
)? {
result.push(diff);
}
}
}
Ok(result)
}
pub(crate) fn handle_remove(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
indices: &[usize],
side: JoinSide,
key_hash: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
_version: CommitVersion,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
match side {
JoinSide::Left => {
for &idx in indices {
let row_number = pre.row_numbers[idx];
operator.cleanup_left_row_joins(txn, *row_number)?;
}
if let Some(diff) = emit_remove_joined_columns_batch(
txn,
pre,
indices,
JoinSide::Left,
&state.right,
key_hash,
operator,
&operator.right_parent,
)? {
result.push(diff);
} else {
let unmatched = operator.unmatched_left_columns_batch(txn, pre, indices)?;
result.push(Diff::Remove {
pre: unmatched,
});
}
for &idx in indices {
let row_number = pre.row_numbers[idx];
remove_from_state_entry(txn, &mut state.left, key_hash, row_number)?;
}
}
JoinSide::Right => {
if let Some(diff) = emit_remove_joined_columns_batch(
txn,
pre,
indices,
JoinSide::Right,
&state.left,
key_hash,
operator,
&operator.left_parent,
)? {
result.push(diff);
}
let will_become_empty = if let Some(entry) = state.right.get(txn, key_hash)? {
entry.rows.len() <= indices.len()
} else {
false
};
for &idx in indices {
let row_number = pre.row_numbers[idx];
remove_from_state_entry(txn, &mut state.right, key_hash, row_number)?;
}
if will_become_empty && !state.right.contains_key(txn, key_hash)? {
let left_columns =
pull_left_columns(txn, &state.left, key_hash, &operator.left_parent)?;
if !left_columns.is_empty() {
let left_indices: Vec<usize> = (0..left_columns.row_count()).collect();
let unmatched = operator.unmatched_left_columns_batch(
txn,
&left_columns,
&left_indices,
)?;
result.push(Diff::Insert {
post: unmatched,
});
}
}
}
}
Ok(result)
}
pub(crate) fn handle_update(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
post: &Columns,
indices: &[usize],
side: JoinSide,
pre_key: &Hash128,
post_key: &Hash128,
state: &mut JoinState,
operator: &JoinOperator,
version: CommitVersion,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
if pre_key == post_key {
for &row_idx in indices {
let pre_row_number = pre.row_numbers[row_idx];
let post_row_number = post.row_numbers[row_idx];
match side {
JoinSide::Left => {
if update_row_in_entry(
txn,
&mut state.left,
pre_key,
pre_row_number,
post_row_number,
)? {
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Left,
&state.right,
pre_key,
operator,
&operator.right_parent,
)? {
result.push(diff);
} else {
let unmatched_pre = operator
.unmatched_left_columns(txn, pre, row_idx)?;
let unmatched_post = operator
.unmatched_left_columns(txn, post, row_idx)?;
result.push(Diff::Update {
pre: unmatched_pre,
post: unmatched_post,
});
}
}
}
JoinSide::Right => {
if update_row_in_entry(
txn,
&mut state.right,
pre_key,
pre_row_number,
post_row_number,
)? {
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Right,
&state.left,
pre_key,
operator,
&operator.left_parent,
)? {
result.push(diff);
}
}
}
}
}
} else {
let remove_diffs =
self.handle_remove(txn, pre, indices, side, pre_key, state, operator, version)?;
result.extend(remove_diffs);
let insert_diffs = self.handle_insert(txn, post, indices, side, post_key, state, operator)?;
result.extend(insert_diffs);
}
Ok(result)
}
}